diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index d5cf201b171bb..6daf69ebd66e6 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -82,6 +82,7 @@ import org.opensearch.common.blobstore.support.PlainBlobMetadata; import org.opensearch.common.collect.Tuple; import org.opensearch.common.io.InputStreamContainer; +import org.opensearch.common.util.concurrent.FutureUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; import org.opensearch.core.common.unit.ByteSizeUnit; @@ -100,6 +101,8 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; @@ -114,6 +117,7 @@ class S3BlobContainer extends AbstractBlobContainer implements AsyncMultiStreamBlobContainer { private static final Logger logger = LogManager.getLogger(S3BlobContainer.class); + private static final long DEFAULT_OPERATION_TIMEOUT = TimeUnit.SECONDS.toSeconds(30); private final S3BlobStore blobStore; private final String keyPath; @@ -389,7 +393,7 @@ public void deleteBlobsIgnoringIfNotExists(List blobNames) throws IOExce private T getFutureValue(PlainActionFuture future) throws IOException { try { - return future.get(); + return future.get(DEFAULT_OPERATION_TIMEOUT, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException("Future got interrupted", e); @@ -398,6 +402,9 @@ private T getFutureValue(PlainActionFuture future) throws IOException { throw (IOException) e.getCause(); } throw new RuntimeException(e.getCause()); + } catch (TimeoutException e) { + FutureUtils.cancel(future); + throw new IOException("Delete operation timed out after 30 seconds", e); } } @@ -782,6 +789,7 @@ CompletableFuture getBlobMetadata(S3AsyncClient s3A @Override public void deleteAsync(ActionListener completionListener) { + logger.debug("Starting async deletion for path [{}]", keyPath); try (AmazonAsyncS3Reference asyncClientReference = blobStore.asyncClientReference()) { S3AsyncClient s3AsyncClient = asyncClientReference.get().client(); @@ -805,6 +813,7 @@ public void deleteAsync(ActionListener completionListener) { @Override public void onSubscribe(Subscription s) { this.subscription = s; + logger.debug("Subscribed to list objects publisher for path [{}]", keyPath); subscription.request(1); } @@ -816,6 +825,8 @@ public void onNext(ListObjectsV2Response response) { objectsToDelete.add(s3Object.key()); }); + logger.debug("Found {} objects to delete in current batch for path [{}]", response.contents().size(), keyPath); + int bulkDeleteSize = blobStore.getBulkDeletesSize(); if (objectsToDelete.size() >= bulkDeleteSize) { int fullBatchesCount = objectsToDelete.size() / bulkDeleteSize; @@ -824,6 +835,7 @@ public void onNext(ListObjectsV2Response response) { List batchToDelete = new ArrayList<>(objectsToDelete.subList(0, itemsToDelete)); objectsToDelete.subList(0, itemsToDelete).clear(); + logger.debug("Executing bulk delete of {} objects for path [{}]", batchToDelete.size(), keyPath); deletionChain = S3AsyncDeleteHelper.executeDeleteChain( s3AsyncClient, blobStore, @@ -838,12 +850,19 @@ public void onNext(ListObjectsV2Response response) { @Override public void onError(Throwable t) { + logger.error(() -> new ParameterizedMessage("Failed to list objects for deletion in path [{}]", keyPath), t); listingFuture.completeExceptionally(new IOException("Failed to list objects for deletion", t)); } @Override public void onComplete() { + logger.debug( + "Completed listing objects for path [{}], remaining objects to delete: {}", + keyPath, + objectsToDelete.size() + ); if (!objectsToDelete.isEmpty()) { + logger.debug("Executing final bulk delete of {} objects for path [{}]", objectsToDelete.size(), keyPath); deletionChain = S3AsyncDeleteHelper.executeDeleteChain( s3AsyncClient, blobStore, @@ -854,8 +873,13 @@ public void onComplete() { } deletionChain.whenComplete((v, throwable) -> { if (throwable != null) { + logger.error( + () -> new ParameterizedMessage("Failed to complete deletion chain for path [{}]", keyPath), + throwable + ); listingFuture.completeExceptionally(throwable); } else { + logger.debug("Successfully completed deletion chain for path [{}]", keyPath); listingFuture.complete(null); } }); @@ -864,16 +888,24 @@ public void onComplete() { listingFuture.whenComplete((v, throwable) -> { if (throwable != null) { + logger.error(() -> new ParameterizedMessage("Failed to complete async deletion for path [{}]", keyPath), throwable); completionListener.onFailure( throwable instanceof Exception ? (Exception) throwable : new IOException("Unexpected error during async deletion", throwable) ); } else { + logger.debug( + "Successfully completed async deletion for path [{}]. Deleted {} blobs totaling {} bytes", + keyPath, + deletedBlobs.get(), + deletedBytes.get() + ); completionListener.onResponse(new DeleteResult(deletedBlobs.get(), deletedBytes.get())); } }); } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Failed to initiate async deletion for path [{}]", keyPath), e); completionListener.onFailure(new IOException("Failed to initiate async deletion", e)); } } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/S3AsyncDeleteHelper.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/S3AsyncDeleteHelper.java index eed95c0e68ef1..57cb4bd825632 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/S3AsyncDeleteHelper.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/S3AsyncDeleteHelper.java @@ -34,9 +34,12 @@ public static CompletableFuture executeDeleteChain( CompletableFuture currentChain, Runnable afterDeleteAction ) { + logger.debug("Starting delete chain execution for {} objects", objectsToDelete.size()); List> batches = createDeleteBatches(objectsToDelete, blobStore.getBulkDeletesSize()); + logger.debug("Created {} delete batches", batches.size()); CompletableFuture newChain = currentChain.thenCompose(v -> executeDeleteBatches(s3AsyncClient, blobStore, batches)); if (afterDeleteAction != null) { + logger.debug("Adding post-delete action to the chain"); newChain = newChain.thenRun(afterDeleteAction); } return newChain; @@ -45,42 +48,58 @@ public static CompletableFuture executeDeleteChain( static List> createDeleteBatches(List keys, int bulkDeleteSize) { List> batches = new ArrayList<>(); for (int i = 0; i < keys.size(); i += bulkDeleteSize) { - batches.add(keys.subList(i, Math.min(keys.size(), i + bulkDeleteSize))); + int batchSize = Math.min(keys.size() - i, bulkDeleteSize); + batches.add(keys.subList(i, i + batchSize)); + logger.debug("Created delete batch of size {} starting at index {}", batchSize, i); } return batches; } static CompletableFuture executeDeleteBatches(S3AsyncClient s3AsyncClient, S3BlobStore blobStore, List> batches) { + logger.debug("Starting execution of {} delete batches", batches.size()); CompletableFuture allDeletesFuture = CompletableFuture.completedFuture(null); - for (List batch : batches) { + for (int i = 0; i < batches.size(); i++) { + List batch = batches.get(i); + logger.debug("Queueing batch {} of {} with {} objects", i + 1, batches.size(), batch.size()); allDeletesFuture = allDeletesFuture.thenCompose(v -> executeSingleDeleteBatch(s3AsyncClient, blobStore, batch)); } - return allDeletesFuture; + return allDeletesFuture.whenComplete((v, throwable) -> { + if (throwable != null) { + logger.error("Failed to complete delete batches execution", throwable); + } else { + logger.debug("Completed execution of all delete batches"); + } + }); } static CompletableFuture executeSingleDeleteBatch(S3AsyncClient s3AsyncClient, S3BlobStore blobStore, List batch) { + logger.debug("Executing delete batch of {} objects", batch.size()); DeleteObjectsRequest deleteRequest = bulkDelete(blobStore.bucket(), batch, blobStore); - return s3AsyncClient.deleteObjects(deleteRequest).thenApply(S3AsyncDeleteHelper::processDeleteResponse); + return s3AsyncClient.deleteObjects(deleteRequest).thenApply(response -> { + logger.debug("Received delete response for batch of {} objects", batch.size()); + return processDeleteResponse(response); + }); } static Void processDeleteResponse(DeleteObjectsResponse deleteObjectsResponse) { - if (!deleteObjectsResponse.errors().isEmpty()) { + if (deleteObjectsResponse.errors().isEmpty()) { + logger.debug("Successfully processed delete response with no errors"); + } else { + List errorDetails = deleteObjectsResponse.errors() + .stream() + .map(s3Error -> "[" + s3Error.key() + "][" + s3Error.code() + "][" + s3Error.message() + "]") + .collect(Collectors.toList()); logger.warn( - () -> new ParameterizedMessage( - "Failed to delete some blobs {}", - deleteObjectsResponse.errors() - .stream() - .map(s3Error -> "[" + s3Error.key() + "][" + s3Error.code() + "][" + s3Error.message() + "]") - .collect(Collectors.toList()) - ) + () -> new ParameterizedMessage("Failed to delete {} objects: {}", deleteObjectsResponse.errors().size(), errorDetails) ); } return null; } static DeleteObjectsRequest bulkDelete(String bucket, List blobs, S3BlobStore blobStore) { + logger.debug("Creating bulk delete request for {} objects in bucket {}", blobs.size(), bucket); return DeleteObjectsRequest.builder() .bucket(bucket) .delete( diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java index d3725642760dc..1a4cf6632508b 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java @@ -2057,6 +2057,72 @@ public void testDeleteBlobsIgnoringIfNotExistsWithExecutionException() throws Ex assertEquals(simulatedError, e.getCause().getCause()); } + public void testDeleteTimeoutWithNeverCompletingAsyncDeletionFuture() throws Exception { + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final BlobPath blobPath = new BlobPath(); + + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + when(blobStore.getBulkDeletesSize()).thenReturn(1000); + + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class); + when(blobStore.asyncClientReference()).thenReturn(asyncClientReference); + AmazonAsyncS3WithCredentials amazonAsyncS3WithCredentials = AmazonAsyncS3WithCredentials.create( + s3AsyncClient, + s3AsyncClient, + s3AsyncClient, + null + ); + when(asyncClientReference.get()).thenReturn(amazonAsyncS3WithCredentials); + + // Create a future that never completes + CompletableFuture neverCompletingFuture = new CompletableFuture<>(); + when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn(neverCompletingFuture); + + // Create a publisher that emits one item and completes + final ListObjectsV2Publisher listPublisher = mock(ListObjectsV2Publisher.class); + final CountDownLatch publisherCompletedLatch = new CountDownLatch(1); + final AtomicBoolean hasEmittedItem = new AtomicBoolean(false); + + doAnswer(invocation -> { + Subscriber subscriber = invocation.getArgument(0); + subscriber.onSubscribe(new Subscription() { + @Override + public void request(long n) { + if (!hasEmittedItem.getAndSet(true)) { + subscriber.onNext( + ListObjectsV2Response.builder() + .contents(Collections.singletonList(S3Object.builder().key("test-key").size(100L).build())) + .build() + ); + publisherCompletedLatch.countDown(); + } else { + subscriber.onComplete(); + } + } + + @Override + public void cancel() {} + }); + return null; + }).when(listPublisher).subscribe(ArgumentMatchers.>any()); + + when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listPublisher); + + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + + IOException ex = assertThrows(IOException.class, blobContainer::delete); + assertEquals("Delete operation timed out after 30 seconds", ex.getMessage()); + + // Wait for publisher to complete + assertTrue("Publisher should complete", publisherCompletedLatch.await(1, TimeUnit.SECONDS)); + + verify(s3AsyncClient, times(1)).listObjectsV2Paginator(any(ListObjectsV2Request.class)); + verify(s3AsyncClient, times(1)).deleteObjects(any(DeleteObjectsRequest.class)); + } + private void mockObjectResponse(S3AsyncClient s3AsyncClient, String bucketName, String blobName, int objectSize) { final InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(objectSize));