From 86ef526c6b1a86693d5d7293e64ec09716ffe1b7 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Wed, 20 Nov 2024 16:53:20 +0100 Subject: [PATCH] Check block existence with object operation (#750) Signed-off-by: Paolo Di Tommaso Co-authored-by: Munish Chouhan --- .../blob/impl/BlobCacheServiceImpl.groovy | 50 ++++++++++--------- .../blob/BlobStateStoreImplTest.groovy | 8 +++ 2 files changed, 34 insertions(+), 24 deletions(-) diff --git a/src/main/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImpl.groovy index 606a4ecd3..c6c3b61d7 100644 --- a/src/main/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImpl.groovy @@ -17,9 +17,6 @@ */ package io.seqera.wave.service.blob.impl -import java.net.http.HttpClient -import java.net.http.HttpRequest -import java.net.http.HttpResponse import groovy.transform.CompileStatic import groovy.util.logging.Slf4j @@ -29,7 +26,6 @@ import io.seqera.wave.configuration.BlobCacheConfig import io.seqera.wave.configuration.HttpClientConfig import io.seqera.wave.core.RegistryProxyService import io.seqera.wave.core.RoutePath -import io.seqera.wave.http.HttpClientFactory import io.seqera.wave.service.blob.BlobCacheService import io.seqera.wave.service.blob.BlobEntry import io.seqera.wave.service.blob.BlobSigningService @@ -38,14 +34,16 @@ import io.seqera.wave.service.job.JobHandler import io.seqera.wave.service.job.JobService import io.seqera.wave.service.job.JobSpec import io.seqera.wave.service.job.JobState +import io.seqera.wave.util.BucketTokenizer import io.seqera.wave.util.Escape -import io.seqera.wave.util.Retryable import io.seqera.wave.util.StringUtils import jakarta.annotation.PostConstruct import jakarta.inject.Inject import jakarta.inject.Named import jakarta.inject.Singleton -import static io.seqera.wave.WaveDefault.HTTP_SERVER_ERRORS +import software.amazon.awssdk.services.s3.S3Client +import software.amazon.awssdk.services.s3.model.HeadObjectRequest +import software.amazon.awssdk.services.s3.model.S3Exception /** * Implements cache for container image layer blobs * @@ -79,11 +77,12 @@ class BlobCacheServiceImpl implements BlobCacheService, JobHandler { @Inject private HttpClientConfig httpConfig - private HttpClient httpClient + @Inject + @Named('BlobS3Client') + private S3Client s3Client @PostConstruct private void init() { - httpClient = HttpClientFactory.followRedirectsHttpClient() log.info "Creating Blob cache service - $blobConfig" } @@ -98,7 +97,7 @@ class BlobCacheServiceImpl implements BlobCacheService, JobHandler { // therefore it's safe to check and return directly // if it exists (no risk of returning a partial upload) // https://developers.cloudflare.com/r2/reference/consistency/ - if( blobExists(info.locationUri) && !debug ) { + if( blobExists(info.objectUri) && !debug ) { log.debug "== Blob cache exists for object '${info.locationUri}'" return info.cached() } @@ -113,21 +112,24 @@ class BlobCacheServiceImpl implements BlobCacheService, JobHandler { return result?.withLocation(locationUri) } - protected boolean blobExists(String uri) { - final request = HttpRequest - .newBuilder(new URI(uri)) - .method("HEAD", HttpRequest.BodyPublishers.noBody()) - .build() - - // retry strategy - final retryable = Retryable - .>of(httpConfig) - .retryIf((response) -> response.statusCode() in HTTP_SERVER_ERRORS) - .onRetry((event) -> log.warn("Unable to connect '$uri' - event: $event")) - - // submit the request - final resp = retryable.apply(()-> httpClient.send(request, HttpResponse.BodyHandlers.ofString())) - return resp.statusCode() == 200 + protected boolean blobExists(String blobLocation) { + try { + final object = BucketTokenizer.from(blobLocation) + final request = HeadObjectRequest + .builder() + .bucket(object.bucket) + .key(object.key) + .build() as HeadObjectRequest + // Execute the request + s3Client.headObject(request) + return true + } + catch (S3Exception e) { + if (e.statusCode() != 404) { + log.error "Unexpected response=${e.statusCode()} checking existence for object=${blobLocation} - cause: ${e.message}" + } + return false + } } /** diff --git a/src/test/groovy/io/seqera/wave/service/blob/BlobStateStoreImplTest.groovy b/src/test/groovy/io/seqera/wave/service/blob/BlobStateStoreImplTest.groovy index 61ad3465f..2898dfd29 100644 --- a/src/test/groovy/io/seqera/wave/service/blob/BlobStateStoreImplTest.groovy +++ b/src/test/groovy/io/seqera/wave/service/blob/BlobStateStoreImplTest.groovy @@ -23,10 +23,14 @@ import spock.lang.Specification import java.time.Duration import io.micronaut.context.annotation.Property +import io.micronaut.test.annotation.MockBean import io.micronaut.test.extensions.spock.annotation.MicronautTest import io.seqera.wave.configuration.BlobCacheConfig import io.seqera.wave.store.state.impl.StateProvider import jakarta.inject.Inject +import jakarta.inject.Named +import software.amazon.awssdk.services.s3.S3Client + /** * * @author Paolo Di Tommaso @@ -37,6 +41,10 @@ import jakarta.inject.Inject @MicronautTest class BlobStateStoreImplTest extends Specification { + @MockBean(S3Client) + @Named('BlobS3Client') + S3Client mockS3Client() { Mock(S3Client) } + @Inject BlobStoreImpl store