Skip to content

Commit

Permalink
Check block existence with object operation (#750)
Browse files Browse the repository at this point in the history

Signed-off-by: Paolo Di Tommaso <[email protected]>
Co-authored-by: Munish Chouhan <[email protected]>
  • Loading branch information
pditommaso and munishchouhan authored Nov 20, 2024
1 parent e24ec62 commit 86ef526
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
*/
package io.seqera.wave.service.blob.impl

import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
Expand All @@ -29,7 +26,6 @@ import io.seqera.wave.configuration.BlobCacheConfig
import io.seqera.wave.configuration.HttpClientConfig
import io.seqera.wave.core.RegistryProxyService
import io.seqera.wave.core.RoutePath
import io.seqera.wave.http.HttpClientFactory
import io.seqera.wave.service.blob.BlobCacheService
import io.seqera.wave.service.blob.BlobEntry
import io.seqera.wave.service.blob.BlobSigningService
Expand All @@ -38,14 +34,16 @@ import io.seqera.wave.service.job.JobHandler
import io.seqera.wave.service.job.JobService
import io.seqera.wave.service.job.JobSpec
import io.seqera.wave.service.job.JobState
import io.seqera.wave.util.BucketTokenizer
import io.seqera.wave.util.Escape
import io.seqera.wave.util.Retryable
import io.seqera.wave.util.StringUtils
import jakarta.annotation.PostConstruct
import jakarta.inject.Inject
import jakarta.inject.Named
import jakarta.inject.Singleton
import static io.seqera.wave.WaveDefault.HTTP_SERVER_ERRORS
import software.amazon.awssdk.services.s3.S3Client
import software.amazon.awssdk.services.s3.model.HeadObjectRequest
import software.amazon.awssdk.services.s3.model.S3Exception
/**
* Implements cache for container image layer blobs
*
Expand Down Expand Up @@ -79,11 +77,12 @@ class BlobCacheServiceImpl implements BlobCacheService, JobHandler<BlobEntry> {
@Inject
private HttpClientConfig httpConfig

private HttpClient httpClient
@Inject
@Named('BlobS3Client')
private S3Client s3Client

@PostConstruct
private void init() {
httpClient = HttpClientFactory.followRedirectsHttpClient()
log.info "Creating Blob cache service - $blobConfig"
}

Expand All @@ -98,7 +97,7 @@ class BlobCacheServiceImpl implements BlobCacheService, JobHandler<BlobEntry> {
// therefore it's safe to check and return directly
// if it exists (no risk of returning a partial upload)
// https://developers.cloudflare.com/r2/reference/consistency/
if( blobExists(info.locationUri) && !debug ) {
if( blobExists(info.objectUri) && !debug ) {
log.debug "== Blob cache exists for object '${info.locationUri}'"
return info.cached()
}
Expand All @@ -113,21 +112,24 @@ class BlobCacheServiceImpl implements BlobCacheService, JobHandler<BlobEntry> {
return result?.withLocation(locationUri)
}

protected boolean blobExists(String uri) {
final request = HttpRequest
.newBuilder(new URI(uri))
.method("HEAD", HttpRequest.BodyPublishers.noBody())
.build()

// retry strategy
final retryable = Retryable
.<HttpResponse<String>>of(httpConfig)
.retryIf((response) -> response.statusCode() in HTTP_SERVER_ERRORS)
.onRetry((event) -> log.warn("Unable to connect '$uri' - event: $event"))

// submit the request
final resp = retryable.apply(()-> httpClient.send(request, HttpResponse.BodyHandlers.ofString()))
return resp.statusCode() == 200
protected boolean blobExists(String blobLocation) {
try {
final object = BucketTokenizer.from(blobLocation)
final request = HeadObjectRequest
.builder()
.bucket(object.bucket)
.key(object.key)
.build() as HeadObjectRequest
// Execute the request
s3Client.headObject(request)
return true
}
catch (S3Exception e) {
if (e.statusCode() != 404) {
log.error "Unexpected response=${e.statusCode()} checking existence for object=${blobLocation} - cause: ${e.message}"
}
return false
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ import spock.lang.Specification
import java.time.Duration

import io.micronaut.context.annotation.Property
import io.micronaut.test.annotation.MockBean
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import io.seqera.wave.configuration.BlobCacheConfig
import io.seqera.wave.store.state.impl.StateProvider
import jakarta.inject.Inject
import jakarta.inject.Named
import software.amazon.awssdk.services.s3.S3Client

/**
*
* @author Paolo Di Tommaso <[email protected]>
Expand All @@ -37,6 +41,10 @@ import jakarta.inject.Inject
@MicronautTest
class BlobStateStoreImplTest extends Specification {

@MockBean(S3Client)
@Named('BlobS3Client')
S3Client mockS3Client() { Mock(S3Client) }

@Inject
BlobStoreImpl store

Expand Down

0 comments on commit 86ef526

Please sign in to comment.