Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@
import static io.trino.plugin.hive.metastore.MetastoreUtil.verifyOnline;
import static io.trino.plugin.hive.metastore.PrincipalPrivileges.NO_PRIVILEGES;
import static io.trino.plugin.hive.metastore.PrincipalPrivileges.fromHivePrivilegeInfos;
import static io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore.cleanExtraOutputFiles;
import static io.trino.plugin.hive.metastore.StorageFormat.VIEW_STORAGE_FORMAT;
import static io.trino.plugin.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static io.trino.plugin.hive.util.CompressionConfigUtil.configureCompression;
Expand Down Expand Up @@ -1903,6 +1904,10 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode
if (partitionUpdate.getUpdateMode() == OVERWRITE) {
if (handle.getLocationHandle().getWriteMode() == DIRECT_TO_TARGET_EXISTING_DIRECTORY) {
removeNonCurrentQueryFiles(session, partitionUpdate.getTargetPath());
if (handle.isRetriesEnabled()) {
HdfsContext hdfsContext = new HdfsContext(session);
cleanExtraOutputFiles(hdfsEnvironment, hdfsContext, session.getQueryId(), partitionUpdate.getTargetPath(), ImmutableSet.copyOf(partitionUpdate.getFileNames()));
}
}
else {
metastore.dropPartition(session, handle.getSchemaName(), handle.getTableName(), partition.getValues(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1946,7 +1946,7 @@ private void cleanExtraOutputFiles(HdfsContext hdfsContext, String queryId, Part
}
verify(partitionAndMore.hasFileNames(), "fileNames expected to be set if isCleanExtraOutputFilesOnCommit is true");

cleanExtraOutputFiles(hdfsContext, queryId, partitionAndMore.getCurrentLocation(), ImmutableSet.copyOf(partitionAndMore.getFileNames()));
SemiTransactionalHiveMetastore.cleanExtraOutputFiles(hdfsEnvironment, hdfsContext, queryId, partitionAndMore.getCurrentLocation(), ImmutableSet.copyOf(partitionAndMore.getFileNames()));
}

private void cleanExtraOutputFiles(HdfsContext hdfsContext, String queryId, TableAndMore tableAndMore)
Expand All @@ -1956,59 +1956,7 @@ private void cleanExtraOutputFiles(HdfsContext hdfsContext, String queryId, Tabl
}
Path tableLocation = tableAndMore.getCurrentLocation().orElseThrow(() -> new IllegalArgumentException("currentLocation expected to be set if isCleanExtraOutputFilesOnCommit is true"));
List<String> files = tableAndMore.getFileNames().orElseThrow(() -> new IllegalArgumentException("fileNames expected to be set if isCleanExtraOutputFilesOnCommit is true"));
cleanExtraOutputFiles(hdfsContext, queryId, tableLocation, ImmutableSet.copyOf(files));
}

private void cleanExtraOutputFiles(HdfsContext hdfsContext, String queryId, Path path, Set<String> filesToKeep)
{
List<String> filesToDelete = new LinkedList<>();
try {
log.debug("Deleting failed attempt files from %s for query %s", path, queryId);
FileSystem fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, path);
if (!fileSystem.exists(path)) {
// directory may nat exit if no files were actually written
return;
}

// files are written flat in a single directory so we do not need to list recursively
RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(path, false);
while (iterator.hasNext()) {
Path file = iterator.next().getPath();
if (isFileCreatedByQuery(file.getName(), queryId) && !filesToKeep.contains(file.getName())) {
filesToDelete.add(file.getName());
}
}

ImmutableList.Builder<String> deletedFilesBuilder = ImmutableList.builder();
Iterator<String> filesToDeleteIterator = filesToDelete.iterator();
while (filesToDeleteIterator.hasNext()) {
String fileName = filesToDeleteIterator.next();
log.debug("Deleting failed attempt file %s/%s for query %s", path, fileName, queryId);
fileSystem.delete(new Path(path, fileName), false);
deletedFilesBuilder.add(fileName);
filesToDeleteIterator.remove();
}

List<String> deletedFiles = deletedFilesBuilder.build();
if (!deletedFiles.isEmpty()) {
log.info("Deleted failed attempt files %s from %s for query %s", deletedFiles, path, queryId);
}
}
catch (IOException e) {
// If we fail here query will be rolled back. The optimal outcome would be for rollback to complete successfully and clean up everything for query.
// Yet if we have problem here, probably rollback will also fail.
//
// Thrown exception is listing files which we could not delete. So those can be cleaned up later by user manually.
// Note it is not a bullet-proof solution.
// The rollback routine will still fire and try to cleanup the changes query made. It will cleanup some, leave some behind probably.
// It is not obvious that if at this point user cleans up the failed attempt files the table would be in the expected state.
//
// Still we cannot do much better for non-transactional Hive tables.
throw new TrinoException(
HIVE_FILESYSTEM_ERROR,
format("Error deleting failed retry attempt files from %s; remaining files %s; manual cleanup may be required", path, filesToDelete),
e);
}
SemiTransactionalHiveMetastore.cleanExtraOutputFiles(hdfsEnvironment, hdfsContext, queryId, tableLocation, ImmutableSet.copyOf(files));
}

private PartitionStatistics getExistingPartitionStatistics(Partition partition, String partitionName)
Expand Down Expand Up @@ -3687,4 +3635,56 @@ public void commitTransaction(long transactionId)
{
delegate.commitTransaction(transactionId);
}

public static void cleanExtraOutputFiles(HdfsEnvironment hdfsEnvironment, HdfsContext hdfsContext, String queryId, Path path, Set<String> filesToKeep)
{
List<String> filesToDelete = new LinkedList<>();
try {
log.debug("Deleting failed attempt files from %s for query %s", path, queryId);
FileSystem fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, path);
if (!fileSystem.exists(path)) {
// directory may nat exit if no files were actually written
return;
}

// files are written flat in a single directory so we do not need to list recursively
RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(path, false);
while (iterator.hasNext()) {
Path file = iterator.next().getPath();
if (isFileCreatedByQuery(file.getName(), queryId) && !filesToKeep.contains(file.getName())) {
filesToDelete.add(file.getName());
}
}

ImmutableList.Builder<String> deletedFilesBuilder = ImmutableList.builder();
Iterator<String> filesToDeleteIterator = filesToDelete.iterator();
while (filesToDeleteIterator.hasNext()) {
String fileName = filesToDeleteIterator.next();
log.debug("Deleting failed attempt file %s/%s for query %s", path, fileName, queryId);
fileSystem.delete(new Path(path, fileName), false);
deletedFilesBuilder.add(fileName);
filesToDeleteIterator.remove();
}

List<String> deletedFiles = deletedFilesBuilder.build();
if (!deletedFiles.isEmpty()) {
log.info("Deleted failed attempt files %s from %s for query %s", deletedFiles, path, queryId);
}
}
catch (IOException e) {
// If we fail here query will be rolled back. The optimal outcome would be for rollback to complete successfully and clean up everything for query.
// Yet if we have problem here, probably rollback will also fail.
//
// Thrown exception is listing files which we could not delete. So those can be cleaned up later by user manually.
// Note it is not a bullet-proof solution.
// The rollback routine will still fire and try to cleanup the changes query made. It will cleanup some, leave some behind probably.
// It is not obvious that if at this point user cleans up the failed attempt files the table would be in the expected state.
//
// Still we cannot do much better for non-transactional Hive tables.
throw new TrinoException(
HIVE_FILESYSTEM_ERROR,
format("Error deleting failed retry attempt files from %s; remaining files %s; manual cleanup may be required", path, filesToDelete),
e);
}
}
}