diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSink.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSink.java index 9ac90b0c3aeb..8657cf81f26e 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSink.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSink.java @@ -333,6 +333,7 @@ public synchronized long getRetainedSize() return INSTANCE_SIZE + estimatedSizeOf(writers, ExchangeStorageWriter::getRetainedSize); } + @GuardedBy("this") private void setupWriterForNextPart() { currentWriter = exchangeStorage.createExchangeStorageWriter( @@ -340,6 +341,7 @@ private void setupWriterForNextPart() writers.add(currentWriter); } + @GuardedBy("this") private void writeInternal(Slice slice) { int position = 0; @@ -359,6 +361,7 @@ private void writeInternal(Slice slice) } } + @GuardedBy("this") private void flushIfNeeded(boolean finished) { SliceOutput buffer = currentBuffer; @@ -410,9 +413,7 @@ public synchronized CompletableFuture isBlocked() } return blockedFuture; } - else { - return NOT_BLOCKED; - } + return NOT_BLOCKED; } public synchronized SliceOutput take() @@ -456,9 +457,7 @@ public synchronized long getRetainedSize() if (closed) { return INSTANCE_SIZE; } - else { - return INSTANCE_SIZE + numBuffers * bufferRetainedSize; - } + return INSTANCE_SIZE + numBuffers * bufferRetainedSize; } public void close() diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java index 5c93a1176155..8c2c6809e719 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java @@ -202,7 +202,6 @@ public S3FileSystemExchangeStorage(S3FileSystemExchangeStorageStats stats, Excha @Override public void createDirectories(URI dir) - throws IOException { // Nothing to do for S3 } @@ -245,48 +244,52 @@ public ListenableFuture createEmptyFile(URI file) public ListenableFuture deleteRecursively(List directories) { if (compatibilityMode == GCP) { - // GCS is not compatible with S3's multi-object delete API https://cloud.google.com/storage/docs/migrating#methods-comparison - Storage storage = gcsClient.orElseThrow(() -> new IllegalStateException("gcsClient is expected to be initialized")); - ListeningExecutorService deleteExecutor = gcsDeleteExecutor.orElseThrow(() -> new IllegalStateException("gcsDeleteExecutor is expected to be initialized")); - return stats.getDeleteRecursively().record(asVoid(deleteExecutor.submit(() -> { - StorageBatch batch = storage.batch(); - for (URI dir : directories) { - Page blobs = storage.list(getBucketName(dir), Storage.BlobListOption.prefix(keyFromUri(dir))); - for (Blob blob : blobs.iterateAll()) { - batch.delete(blob.getBlobId()); - } - } - batch.submit(); - }))); + return deleteRecursivelyGcp(directories); + } + + ImmutableMultimap.Builder>> bucketToListObjectsFuturesBuilder = ImmutableMultimap.builder(); + for (URI dir : directories) { + ImmutableList.Builder keys = ImmutableList.builder(); + ListenableFuture> listObjectsFuture = Futures.transform( + toListenableFuture((listObjectsRecursively(dir) + .subscribe(listObjectsV2Response -> listObjectsV2Response.contents().stream() + .map(S3Object::key) + .forEach(keys::add)))), + ignored -> keys.build(), + directExecutor()); + bucketToListObjectsFuturesBuilder.put(getBucketName(dir), listObjectsFuture); + } + Multimap>> bucketToListObjectsFutures = bucketToListObjectsFuturesBuilder.build(); + + ImmutableList.Builder>> deleteObjectsFutures = ImmutableList.builder(); + for (String bucketName : bucketToListObjectsFutures.keySet()) { + deleteObjectsFutures.add(Futures.transformAsync( + Futures.allAsList(bucketToListObjectsFutures.get(bucketName)), + keys -> deleteObjects( + bucketName, + keys.stream() + .flatMap(Collection::stream) + .collect(toImmutableList())), + directExecutor())); } - else { - ImmutableMultimap.Builder>> bucketToListObjectsFuturesBuilder = ImmutableMultimap.builder(); + return translateFailures(Futures.allAsList(deleteObjectsFutures.build())); + } + + private ListenableFuture deleteRecursivelyGcp(List directories) + { + // GCS is not compatible with S3's multi-object delete API https://cloud.google.com/storage/docs/migrating#methods-comparison + Storage storage = gcsClient.orElseThrow(() -> new IllegalStateException("gcsClient is expected to be initialized")); + ListeningExecutorService deleteExecutor = gcsDeleteExecutor.orElseThrow(() -> new IllegalStateException("gcsDeleteExecutor is expected to be initialized")); + return stats.getDeleteRecursively().record(translateFailures(deleteExecutor.submit(() -> { + StorageBatch batch = storage.batch(); for (URI dir : directories) { - ImmutableList.Builder keys = ImmutableList.builder(); - ListenableFuture> listObjectsFuture = Futures.transform( - toListenableFuture((listObjectsRecursively(dir) - .subscribe(listObjectsV2Response -> listObjectsV2Response.contents().stream() - .map(S3Object::key) - .forEach(keys::add)))), - ignored -> keys.build(), - directExecutor()); - bucketToListObjectsFuturesBuilder.put(getBucketName(dir), listObjectsFuture); - } - Multimap>> bucketToListObjectsFutures = bucketToListObjectsFuturesBuilder.build(); - - ImmutableList.Builder>> deleteObjectsFutures = ImmutableList.builder(); - for (String bucketName : bucketToListObjectsFutures.keySet()) { - deleteObjectsFutures.add(Futures.transformAsync( - Futures.allAsList(bucketToListObjectsFutures.get(bucketName)), - keys -> deleteObjects( - bucketName, - keys.stream() - .flatMap(Collection::stream) - .collect(toImmutableList())), - directExecutor())); + Page blobs = storage.list(getBucketName(dir), Storage.BlobListOption.prefix(keyFromUri(dir))); + for (Blob blob : blobs.iterateAll()) { + batch.delete(blob.getBlobId()); + } } - return translateFailures(Futures.allAsList(deleteObjectsFutures.build())); - } + batch.submit(); + }))); } @Override @@ -651,6 +654,7 @@ public synchronized void close() inProgressReadFuture = immediateVoidFuture(); // such that we don't retain reference to the buffer } + @GuardedBy("this") private void fillBuffer() { if (currentFile == null || fileOffset == currentFile.getFileSize()) {