diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeFutures.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeFutures.java new file mode 100644 index 000000000000..d8f1f81f59dc --- /dev/null +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeFutures.java @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.exchange.filesystem; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +import java.io.IOException; + +import static com.google.common.util.concurrent.Futures.immediateFailedFuture; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static io.airlift.concurrent.MoreFutures.asVoid; + +public final class FileSystemExchangeFutures +{ + private FileSystemExchangeFutures() {} + + // Helper function that translates exception and transform future type to avoid abstraction leak + public static ListenableFuture translateFailures(ListenableFuture listenableFuture) + { + return asVoid(Futures.catchingAsync(listenableFuture, Throwable.class, throwable -> { + if (throwable instanceof Error || throwable instanceof IOException) { + return immediateFailedFuture(throwable); + } + else { + return immediateFailedFuture(new IOException(throwable)); + } + }, directExecutor())); + } +} diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/azure/AzureBlobFileSystemExchangeStorage.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/azure/AzureBlobFileSystemExchangeStorage.java index 1841cb15e672..3696052d5051 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/azure/AzureBlobFileSystemExchangeStorage.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/azure/AzureBlobFileSystemExchangeStorage.java @@ -31,6 +31,9 @@ import com.azure.storage.blob.models.ListBlobsOptions; import com.azure.storage.blob.specialized.BlockBlobAsyncClient; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -77,6 +80,7 @@ import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.airlift.concurrent.MoreFutures.toListenableFuture; import static io.airlift.slice.SizeOf.estimatedSizeOf; +import static io.trino.plugin.exchange.filesystem.FileSystemExchangeFutures.translateFailures; import static io.trino.plugin.exchange.filesystem.FileSystemExchangeManager.PATH_SEPARATOR; import static java.lang.Math.min; import static java.lang.Math.toIntExact; @@ -150,7 +154,7 @@ public ListenableFuture createEmptyFile(URI file) { String containerName = getContainerName(file); String blobName = getPath(file); - return asVoid(toListenableFuture(blobServiceAsyncClient + return translateFailures(toListenableFuture(blobServiceAsyncClient .getBlobContainerAsyncClient(containerName) .getBlobAsyncClient(blobName) .upload(BinaryData.fromString("")) @@ -160,12 +164,32 @@ public ListenableFuture createEmptyFile(URI file) @Override public ListenableFuture deleteRecursively(List directories) { - return asVoid(Futures.allAsList(directories.stream() - .map(dir -> Futures.transformAsync( - toListenableFuture(listObjectsRecursively(dir).byPage().collectList().toFuture()), - pagedResponseList -> deleteObjects(getContainerName(dir), pagedResponseList), - directExecutor())) - .collect(toImmutableList()))); + ImmutableMultimap.Builder>>> containerToListObjectsFuturesBuilder = ImmutableMultimap.builder(); + directories.forEach(dir -> containerToListObjectsFuturesBuilder.put( + getContainerName(dir), + toListenableFuture(listObjectsRecursively(dir).byPage().collectList().toFuture()))); + Multimap>>> containerToListObjectsFutures = containerToListObjectsFuturesBuilder.build(); + + ImmutableList.Builder>> deleteObjectsFutures = ImmutableList.builder(); + for (String containerName : containerToListObjectsFutures.keySet()) { + BlobContainerClient blobContainerClient = blobServiceClient.getBlobContainerClient(containerName); + deleteObjectsFutures.add(Futures.transformAsync( + Futures.allAsList(containerToListObjectsFutures.get(containerName)), + nestedPagedResponseList -> { + ImmutableList.Builder blobUrls = ImmutableList.builder(); + for (List> pagedResponseList : nestedPagedResponseList) { + for (PagedResponse pagedResponse : pagedResponseList) { + pagedResponse.getValue().forEach(blobItem -> { + blobUrls.add(blobContainerClient.getBlobClient(blobItem.getName()).getBlobUrl()); + }); + } + } + return deleteObjects(blobUrls.build()); + }, + directExecutor())); + } + + return translateFailures(Futures.allAsList(deleteObjectsFutures.build())); } @Override @@ -241,23 +265,16 @@ private PagedFlux listObjectsRecursively(URI dir) return blobServiceAsyncClient .getBlobContainerAsyncClient(containerName) - // deleteBlobs can delete at most 256 blobs at a time - .listBlobsByHierarchy(null, (new ListBlobsOptions()).setPrefix(directoryPath).setMaxResultsPerPage(256)); + .listBlobsByHierarchy(null, (new ListBlobsOptions()).setPrefix(directoryPath)); } - private ListenableFuture> deleteObjects(String containerName, List> pageBlobItems) + private ListenableFuture> deleteObjects(List blobUrls) { BlobBatchAsyncClient blobBatchAsyncClient = new BlobBatchClientBuilder(blobServiceClient).buildAsyncClient(); - BlobContainerClient blobContainerClient = blobServiceClient.getBlobContainerClient(containerName); - - return Futures.allAsList(pageBlobItems.stream().map(pageBlobItem -> { - if (pageBlobItem.getValue().isEmpty()) { - return immediateVoidFuture(); - } - ImmutableList.Builder builder = ImmutableList.builder(); - pageBlobItem.getValue().forEach(blobItem -> builder.add(blobContainerClient.getBlobClient(blobItem.getName()).getBlobUrl())); - return toListenableFuture(blobBatchAsyncClient.deleteBlobs(builder.build(), DeleteSnapshotsOptionType.INCLUDE).then().toFuture()); - }).collect(toImmutableList())); + // deleteBlobs can delete at most 256 blobs at a time + return Futures.allAsList(Lists.partition(blobUrls, 256).stream() + .map(list -> toListenableFuture(blobBatchAsyncClient.deleteBlobs(list, DeleteSnapshotsOptionType.INCLUDE).then().toFuture())) + .collect(toImmutableList())); } // URI format: abfs[s]://@.dfs.core.windows.net// @@ -505,7 +522,7 @@ public ListenableFuture write(Slice slice) // Skip multipart upload if there would only be one part if (slice.length() < blockSize && multiPartUploadFutures.isEmpty()) { - directUploadFuture = asVoid(toListenableFuture(blockBlobAsyncClient.upload(Flux.just(slice.toByteBuffer()), slice.length()).toFuture())); + directUploadFuture = translateFailures(toListenableFuture(blockBlobAsyncClient.upload(Flux.just(slice.toByteBuffer()), slice.length()).toFuture())); return directUploadFuture; } @@ -513,7 +530,7 @@ public ListenableFuture write(Slice slice) ListenableFuture uploadFuture = toListenableFuture(blockBlobAsyncClient.stageBlock(blockId, Flux.just(slice.toByteBuffer()), slice.length()).toFuture()); multiPartUploadFutures.add(uploadFuture); blockIds.add(blockId); - return uploadFuture; + return translateFailures(uploadFuture); } @Override @@ -527,10 +544,10 @@ public ListenableFuture finish() return requireNonNullElseGet(directUploadFuture, Futures::immediateVoidFuture); } - ListenableFuture finishFuture = Futures.transformAsync( + ListenableFuture finishFuture = translateFailures(Futures.transformAsync( Futures.allAsList(multiPartUploadFutures), - ignored -> asVoid(toListenableFuture(blockBlobAsyncClient.commitBlockList(blockIds).toFuture())), - directExecutor()); + ignored -> toListenableFuture(blockBlobAsyncClient.commitBlockList(blockIds).toFuture()), + directExecutor())); Futures.addCallback(finishFuture, new FutureCallback<>() { @Override public void onSuccess(Void result) diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java index 3f71b86df307..e87eb5f494cf 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java @@ -21,7 +21,9 @@ import com.google.cloud.storage.StorageBatch; import com.google.cloud.storage.StorageOptions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; import com.google.common.io.Closer; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -89,6 +91,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.Queue; @@ -100,7 +103,6 @@ import static com.google.common.base.Strings.nullToEmpty; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; -import static com.google.common.util.concurrent.Futures.immediateFailedFuture; import static com.google.common.util.concurrent.Futures.immediateVoidFuture; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; @@ -108,6 +110,7 @@ import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.airlift.concurrent.MoreFutures.toListenableFuture; import static io.airlift.concurrent.Threads.threadsNamed; +import static io.trino.plugin.exchange.filesystem.FileSystemExchangeFutures.translateFailures; import static io.trino.plugin.exchange.filesystem.FileSystemExchangeManager.PATH_SEPARATOR; import static io.trino.plugin.exchange.filesystem.s3.S3FileSystemExchangeStorage.CompatibilityMode.GCP; import static io.trino.plugin.exchange.filesystem.s3.S3RequestUtil.configureEncryption; @@ -232,7 +235,7 @@ public ListenableFuture createEmptyFile(URI file) .key(keyFromUri(file)) .build(); - return stats.getCreateEmptyFile().record(transformFuture(toListenableFuture(s3AsyncClient.putObject(request, AsyncRequestBody.empty())))); + return stats.getCreateEmptyFile().record(translateFailures(toListenableFuture(s3AsyncClient.putObject(request, AsyncRequestBody.empty())))); } @Override @@ -254,14 +257,32 @@ public ListenableFuture deleteRecursively(List directories) }))); } else { - return stats.getDeleteRecursively().record(asVoid(Futures.allAsList(directories.stream().map(dir -> { + ImmutableMultimap.Builder>> bucketToListObjectsFuturesBuilder = ImmutableMultimap.builder(); + for (URI dir : directories) { ImmutableList.Builder keys = ImmutableList.builder(); - return transformFuture(Futures.transformAsync( - toListenableFuture((listObjectsRecursively(dir).subscribe(listObjectsV2Response -> - listObjectsV2Response.contents().stream().map(S3Object::key).forEach(keys::add)))), - ignored -> deleteObjects(getBucketName(dir), keys.build()), + ListenableFuture> listObjectsFuture = Futures.transform( + toListenableFuture((listObjectsRecursively(dir) + .subscribe(listObjectsV2Response -> listObjectsV2Response.contents().stream() + .map(S3Object::key) + .forEach(keys::add)))), + ignored -> keys.build(), + directExecutor()); + bucketToListObjectsFuturesBuilder.put(getBucketName(dir), listObjectsFuture); + } + Multimap>> bucketToListObjectsFutures = bucketToListObjectsFuturesBuilder.build(); + + ImmutableList.Builder>> deleteObjectsFutures = ImmutableList.builder(); + for (String bucketName : bucketToListObjectsFutures.keySet()) { + deleteObjectsFutures.add(Futures.transformAsync( + Futures.allAsList(bucketToListObjectsFutures.get(bucketName)), + keys -> deleteObjects( + bucketName, + keys.stream() + .flatMap(Collection::stream) + .collect(toImmutableList())), directExecutor())); - }).collect(toImmutableList())))); + } + return translateFailures(Futures.allAsList(deleteObjectsFutures.build())); } } @@ -432,19 +453,6 @@ private static String keyFromUri(URI uri) return key; } - // Helper function that translates exception and transform future type to avoid abstraction leak - private static ListenableFuture transformFuture(ListenableFuture listenableFuture) - { - return asVoid(Futures.catchingAsync(listenableFuture, Throwable.class, throwable -> { - if (throwable instanceof Error || throwable instanceof IOException) { - return immediateFailedFuture(throwable); - } - else { - return immediateFailedFuture(new IOException(throwable)); - } - }, directExecutor())); - } - private static boolean isDirectory(URI uri) { return uri.toString().endsWith(PATH_SEPARATOR); @@ -729,7 +737,7 @@ public ListenableFuture write(Slice slice) .key(key) .storageClass(storageClass); configureEncryption(secretKey, putObjectRequestBuilder); - directUploadFuture = transformFuture(toListenableFuture(s3AsyncClient.putObject(putObjectRequestBuilder.build(), + directUploadFuture = translateFailures(toListenableFuture(s3AsyncClient.putObject(putObjectRequestBuilder.build(), ByteBufferAsyncRequestBody.fromByteBuffer(slice.toByteBuffer())))); stats.getPutObject().record(directUploadFuture); stats.getPutObjectDataSizeInBytes().add(slice.length()); @@ -744,7 +752,7 @@ public ListenableFuture write(Slice slice) ListenableFuture uploadFuture = Futures.transformAsync(multiPartUploadIdFuture, uploadId -> uploadPart(uploadId, slice, partNum), directExecutor()); multiPartUploadFutures.add(uploadFuture); - return transformFuture(uploadFuture); + return translateFailures(uploadFuture); } @Override @@ -758,7 +766,7 @@ public ListenableFuture finish() return requireNonNullElseGet(directUploadFuture, Futures::immediateVoidFuture); } - ListenableFuture finishFuture = transformFuture(Futures.transformAsync( + ListenableFuture finishFuture = translateFailures(Futures.transformAsync( Futures.allAsList(multiPartUploadFutures), completedParts -> completeMultipartUpload(getFutureValue(multiPartUploadIdFuture), completedParts), directExecutor())); @@ -795,7 +803,7 @@ public ListenableFuture abort() verify(directUploadFuture == null); multiPartUploadFutures.forEach(future -> future.cancel(true)); - return transformFuture(Futures.transformAsync(multiPartUploadIdFuture, this::abortMultipartUpload, directExecutor())); + return translateFailures(Futures.transformAsync(multiPartUploadIdFuture, this::abortMultipartUpload, directExecutor())); } @Override