Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.filesystem;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;

public final class FileSystemUtils
{
private FileSystemUtils() {}

public static FileSystem getRawFileSystem(FileSystem fileSystem)
{
if (fileSystem instanceof FilterFileSystem) {
return getRawFileSystem(((FilterFileSystem) fileSystem).getRawFileSystem());
}
return fileSystem;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.iceberg.io.FileIO;

import java.io.IOException;
import java.util.Collection;

public interface TrinoFileSystem
{
Expand All @@ -28,6 +29,14 @@ public interface TrinoFileSystem
void deleteFile(String path)
throws IOException;

/**
* Delete paths in batches, it is not guaranteed to be atomic.
* @param paths collection of paths to be deleted
* @throws IOException when there is a problem with deletion of one or more specific paths
*/
void deleteFiles(Collection<String> paths)
throws IOException;

void deleteDirectory(String path)
throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.TrinoOutputFile;
import io.trino.filesystem.fileio.ForwardingFileIo;
import io.trino.hdfs.FileSystemWithBatchDelete;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -26,10 +27,18 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import static io.trino.filesystem.FileSystemUtils.getRawFileSystem;
import static io.trino.filesystem.hdfs.HadoopPaths.hadoopPath;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;

class HdfsFileSystem
implements TrinoFileSystem
Expand Down Expand Up @@ -75,6 +84,30 @@ public void deleteFile(String path)
});
}

@Override
public void deleteFiles(Collection<String> paths)
throws IOException
{
Map<Path, List<Path>> pathsGroupedByDirectory = paths.stream().collect(
groupingBy(
path -> hadoopPath(path.replaceFirst("/[^/]*$", "")),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work if the file is in the root directory of the file system?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then it will be grouped by Path("" ) - is it incorrect ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably something more like s3://bucket/ in the s3 case at least. I think it's fine

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it should be fine imho

mapping(HadoopPaths::hadoopPath, toList())));
Comment thread
findepi marked this conversation as resolved.
Outdated
for (Entry<Path, List<Path>> directoryWithPaths : pathsGroupedByDirectory.entrySet()) {
FileSystem rawFileSystem = getRawFileSystem(environment.getFileSystem(context, directoryWithPaths.getKey()));
environment.doAs(context.getIdentity(), () -> {
if (rawFileSystem instanceof FileSystemWithBatchDelete fileSystemWithBatchDelete) {
fileSystemWithBatchDelete.deleteFiles(directoryWithPaths.getValue());
}
else {
for (Path path : directoryWithPaths.getValue()) {
rawFileSystem.delete(path, false);
}
}
return null;
});
}
}

@Override
public void deleteDirectory(String path)
throws IOException
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.hdfs;

import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.util.Collection;

public interface FileSystemWithBatchDelete
{
void deleteFiles(Collection<Path> paths)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iceberg.io.FileIO;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.function.Consumer;

Expand Down Expand Up @@ -98,6 +99,12 @@ public void deleteFile(String path)
throw new UnsupportedOperationException();
}

@Override
public void deleteFiles(Collection<String> paths)
{
throw new UnsupportedOperationException();
}

@Override
public void deleteDirectory(String path)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.CryptoConfiguration;
import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
import com.amazonaws.services.s3.model.EncryptionMaterialsProvider;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
Expand All @@ -74,13 +76,15 @@
import com.google.common.base.Splitter;
import com.google.common.collect.AbstractSequentialIterator;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.io.Closer;
import com.google.common.net.MediaType;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.hdfs.FSDataInputStreamTail;
import io.trino.hdfs.FileSystemWithBatchDelete;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
Expand Down Expand Up @@ -115,6 +119,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -142,6 +147,7 @@
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.toArray;
import static com.google.common.hash.Hashing.md5;
import static io.airlift.concurrent.Threads.threadsNamed;
Expand Down Expand Up @@ -169,6 +175,7 @@

public class TrinoS3FileSystem
extends FileSystem
implements FileSystemWithBatchDelete
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't we moving away from use of hadoop's FileSystem towards TrinoFileSystem. Should TrinoFileSystem support batch delete?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it should, though currently only HdfsFileSystem implements it and it doesn't support batch delete.

{
public static final String S3_USER_AGENT_PREFIX = "trino.s3.user-agent-prefix";
public static final String S3_CREDENTIALS_PROVIDER = "trino.s3.credentials-provider";
Expand Down Expand Up @@ -230,6 +237,7 @@ public class TrinoS3FileSystem
private static final Set<String> GLACIER_STORAGE_CLASSES = ImmutableSet.of(Glacier.toString(), DeepArchive.toString());
private static final MediaType DIRECTORY_MEDIA_TYPE = MediaType.create("application", "x-directory");
private static final String S3_DEFAULT_ROLE_SESSION_NAME = "trino-session";
public static final int DELETE_BATCH_SIZE = 1000;

private URI uri;
private Path workingDirectory;
Expand Down Expand Up @@ -643,6 +651,35 @@ private boolean deleteObject(String key)
}
}

@Override
public void deleteFiles(Collection<Path> paths)
throws IOException
{
try {
Iterable<List<Path>> partitions = Iterables.partition(paths, DELETE_BATCH_SIZE);
for (List<Path> currentBatch : partitions) {
deletePaths(currentBatch);
}
}
catch (AmazonClientException e) {
throw new IOException("Exception while batch deleting paths", e);
}
}

private void deletePaths(List<Path> paths)
{
List<KeyVersion> keys = paths.stream()
.map(TrinoS3FileSystem::keyFromPath)
.map(KeyVersion::new)
.collect(toImmutableList());
DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(getBucketName(uri))
.withRequesterPays(requesterPaysEnabled)
.withKeys(keys)
.withQuiet(true);

s3.deleteObjects(deleteObjectsRequest);
}

@Override
public boolean mkdirs(Path f, FsPermission permission)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.viewfs.ViewFileSystem;
import org.apache.hadoop.hdfs.DistributedFileSystem;
Expand Down Expand Up @@ -96,6 +95,7 @@
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.io.BaseEncoding.base16;
import static io.trino.filesystem.FileSystemUtils.getRawFileSystem;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_DATABASE_LOCATION_ERROR;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_PARTITION_VALUE;
Expand Down Expand Up @@ -495,14 +495,6 @@ public static boolean isViewFileSystem(HdfsContext context, HdfsEnvironment hdfs
}
}

public static FileSystem getRawFileSystem(FileSystem fileSystem)
{
if (fileSystem instanceof FilterFileSystem) {
return getRawFileSystem(((FilterFileSystem) fileSystem).getRawFileSystem());
}
return fileSystem;
}

private static boolean isDirectory(HdfsContext context, HdfsEnvironment hdfsEnvironment, Path path)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.trino.filesystem.FileSystemUtils.getRawFileSystem;
import static io.trino.plugin.hive.AbstractTestHive.createTableProperties;
import static io.trino.plugin.hive.AbstractTestHive.filterNonHiddenColumnHandles;
import static io.trino.plugin.hive.AbstractTestHive.filterNonHiddenColumnMetadata;
Expand All @@ -105,7 +106,6 @@
import static io.trino.plugin.hive.HiveType.HIVE_LONG;
import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder;
import static io.trino.plugin.hive.metastore.PrincipalPrivileges.NO_PRIVILEGES;
import static io.trino.plugin.hive.util.HiveWriteUtils.getRawFileSystem;
import static io.trino.spi.connector.MetadataProvider.NOOP_METADATA_PROVIDER;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.type.BigintType.BIGINT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -269,6 +270,8 @@ public class IcebergMetadata
private static final String NUMBER_OF_DISTINCT_VALUES_NAME = "NUMBER_OF_DISTINCT_VALUES";
private static final FunctionName NUMBER_OF_DISTINCT_VALUES_FUNCTION = new FunctionName(IcebergThetaSketchForStats.NAME);

private static final Integer DELETE_BATCH_SIZE = 1000;

private final TypeManager typeManager;
private final JsonCodec<CommitTaskData> commitTaskCodec;
private final TrinoCatalog catalog;
Expand Down Expand Up @@ -1176,10 +1179,32 @@ private void executeExpireSnapshots(ConnectorSession session, IcebergTableExecut
IcebergSessionProperties.EXPIRE_SNAPSHOTS_MIN_RETENTION);

long expireTimestampMillis = session.getStart().toEpochMilli() - retention.toMillis();
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
List<String> pathsToDelete = new ArrayList<>();
// deleteFunction is not accessed from multiple threads unless .executeDeleteWith() is used
Consumer<String> deleteFunction = path -> {
pathsToDelete.add(path);
if (pathsToDelete.size() == DELETE_BATCH_SIZE) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this check (and the deletion code associated to it) still needed?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it is still needed, I don't think we want to create a very huge list here and store in memory, we can discuss the value of DELETE_BATCH_SIZE but I think in general this check is needed

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tasks.foreach(manifestsToDelete)
        .executeWith(deleteExecutorService)
        .retry(3).stopRetryOn(NotFoundException.class).suppressFailureWhenFinished()
        .onFailure((manifest, exc) -> LOG.warn("Delete failed for manifest: {}", manifest, exc))
        .run(deleteFunc::accept)

From org.apache.iceberg.RemoveSnapshots.

You may end up in a situation where you add stuff with Tasks.foreach, but in another thread you reach DELETE_BATCH_SIZE and clear also paths that were not yet deleted.

Copy link
Copy Markdown
Member Author

@homar homar Oct 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not true, current implementation in the worst case deletes file at the same time as the old one. The old one deletes files when deleteFunc::accept is called the new one either adds file to the list to be deleted later or deletes it at the time of the invocation.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in a threaded context many things go wrong

  • ArrayList can get corrupted, we may loose files
  • pathsToDelete.clear(); can be invoked with some paths never sent to deletion
  • deleteFiles can be invoked twice on same/similar sets of paths and may result in "file missing so cannot delete" exception

try {
fileSystem.deleteFiles(pathsToDelete);
pathsToDelete.clear();
}
catch (IOException e) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to delete files during snapshot expiration", e);
}
}
};

table.expireSnapshots()
.expireOlderThan(expireTimestampMillis)
.deleteWith(deleteFunction)
Comment thread
findepi marked this conversation as resolved.
Outdated
.commit();
try {
fileSystem.deleteFiles(pathsToDelete);
}
catch (IOException e) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to delete files during snapshot expiration", e);
}
}

private static void validateTableExecuteParameters(
Expand Down
Loading