Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blocking executor to async caches #759

Merged
merged 8 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.time.Duration
import java.util.concurrent.CompletionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit

import com.github.benmanes.caffeine.cache.AsyncLoadingCache
Expand All @@ -33,14 +34,17 @@ import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.transform.ToString
import groovy.util.logging.Slf4j
import io.micronaut.scheduling.TaskExecutors
import io.seqera.wave.configuration.HttpClientConfig
import io.seqera.wave.exception.RegistryForwardException
import io.seqera.wave.exception.RegistryUnauthorizedAccessException
import io.seqera.wave.http.HttpClientFactory
import io.seqera.wave.util.RegHelper
import io.seqera.wave.util.Retryable
import io.seqera.wave.util.StringUtils
import jakarta.annotation.PostConstruct
import jakarta.inject.Inject
import jakarta.inject.Named
import jakarta.inject.Singleton
import static io.seqera.wave.WaveDefault.DOCKER_IO
import static io.seqera.wave.auth.RegistryUtils.isServerError
Expand All @@ -64,6 +68,10 @@ class RegistryAuthServiceImpl implements RegistryAuthService {
@Inject
private RegistryTokenStore tokenStore

@Inject
@Named(TaskExecutors.BLOCKING)
private ExecutorService ioExecutor

@Canonical
@ToString(includePackage = false, includeNames = true)
static private class CacheKey {
Expand Down Expand Up @@ -101,16 +109,23 @@ class RegistryAuthServiceImpl implements RegistryAuthService {
}

// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncLoadingCache<CacheKey, String> cacheTokens = Caffeine
.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(_1_HOUR.toMillis(), TimeUnit.MILLISECONDS)
.buildAsync(loader)
private AsyncLoadingCache<CacheKey, String> cacheTokens

@Inject
private RegistryLookupService lookupService

@Inject RegistryCredentialsFactory credentialsFactory
@Inject
private RegistryCredentialsFactory credentialsFactory

@PostConstruct
private void init() {
cacheTokens = Caffeine
.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(_1_HOUR.toMillis(), TimeUnit.MILLISECONDS)
.executor(ioExecutor)
.buildAsync(loader)
}

/**
* Implements container registry login
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,22 @@ package io.seqera.wave.auth
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.util.concurrent.CompletionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit

import com.github.benmanes.caffeine.cache.AsyncLoadingCache
import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.scheduling.TaskExecutors
import io.seqera.wave.configuration.HttpClientConfig
import io.seqera.wave.exception.RegistryForwardException
import io.seqera.wave.http.HttpClientFactory
import io.seqera.wave.util.Retryable
import jakarta.annotation.PostConstruct
import jakarta.inject.Inject
import jakarta.inject.Named
import jakarta.inject.Singleton
import static io.seqera.wave.WaveDefault.DOCKER_IO
import static io.seqera.wave.WaveDefault.DOCKER_REGISTRY_1
Expand All @@ -55,6 +59,10 @@ class RegistryLookupServiceImpl implements RegistryLookupService {
@Inject
private RegistryAuthStore store

@Inject
@Named(TaskExecutors.BLOCKING)
private ExecutorService ioExecutor

private CacheLoader<URI, RegistryAuth> loader = new CacheLoader<URI, RegistryAuth>() {
@Override
RegistryAuth load(URI endpoint) throws Exception {
Expand All @@ -74,11 +82,17 @@ class RegistryLookupServiceImpl implements RegistryLookupService {
}

// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncLoadingCache<URI, RegistryAuth> cache = Caffeine
private AsyncLoadingCache<URI, RegistryAuth> cache

@PostConstruct
void init() {
cache = Caffeine
.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(1, TimeUnit.HOURS)
.executor(ioExecutor)
.buildAsync(loader)
}

protected RegistryAuth lookup0(URI endpoint) {
final httpClient = HttpClientFactory.followRedirectsHttpClient()
Expand Down
26 changes: 20 additions & 6 deletions src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package io.seqera.wave.service.aws

import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
import java.util.regex.Pattern

Expand All @@ -27,7 +28,11 @@ import com.github.benmanes.caffeine.cache.Caffeine
import groovy.transform.Canonical
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.scheduling.TaskExecutors
import io.seqera.wave.util.StringUtils
import jakarta.annotation.PostConstruct
import jakarta.inject.Inject
import jakarta.inject.Named
import jakarta.inject.Singleton
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
Expand Down Expand Up @@ -73,13 +78,22 @@ class AwsEcrService {
}
}

// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncLoadingCache<AwsCreds, String> cache = Caffeine
.newBuilder()
.maximumSize(10_000)
.expireAfterWrite(3, TimeUnit.HOURS)
.buildAsync(loader)
@Inject
@Named(TaskExecutors.BLOCKING)
private ExecutorService ioExecutor

// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncLoadingCache<AwsCreds, String> cache

@PostConstruct
private void init() {
cache = Caffeine
.newBuilder()
.maximumSize(10_000)
.expireAfterWrite(3, TimeUnit.HOURS)
.executor(ioExecutor)
.buildAsync(loader)
}

private EcrClient ecrClient(String accessKey, String secretKey, String region) {
EcrClient.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import java.util.concurrent.TimeoutException
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.context.annotation.Value
import io.micronaut.scheduling.TaskExecutors
import io.seqera.wave.encoder.EncodingStrategy
import jakarta.inject.Inject
import jakarta.inject.Named
Expand Down Expand Up @@ -55,7 +56,7 @@ abstract class AbstractFutureStore<V> implements FutureStore<String,V> {
private volatile Duration pollInterval

@Inject
@Named('future-store-executor')
@Named(TaskExecutors.BLOCKING)
private ExecutorService executor

AbstractFutureStore(FutureHash<String> store, EncodingStrategy<V> encodingStrategy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package io.seqera.wave.service.data.queue

import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

Expand Down Expand Up @@ -61,21 +62,27 @@ abstract class AbstractMessageQueue<M> implements Runnable {
final private String name0

// FIXME https://github.com/seqeralabs/wave/issues/747
final private AsyncCache<String,Boolean> closedClients = Caffeine
.newBuilder()
.expireAfterWrite(10, TimeUnit.MINUTES)
.buildAsync()
final private AsyncCache<String,Boolean> closedClients

AbstractMessageQueue(MessageQueue<String> broker) {
AbstractMessageQueue(MessageQueue<String> broker, ExecutorService ioExecutor) {
final type = TypeHelper.getGenericType(this, 0)
this.encoder = new MoshiEncodeStrategy<M>(type) {}
this.broker = broker
this.closedClients = createCache(ioExecutor)
this.name0 = name() + '-thread-' + count.getAndIncrement()
this.thread = new Thread(this, name0)
this.thread.setDaemon(true)
this.thread.start()
}

private AsyncCache<String,Boolean> createCache(ExecutorService ioExecutor) {
Caffeine
.newBuilder()
.executor(ioExecutor)
.expireAfterWrite(10, TimeUnit.MINUTES)
.buildAsync()
}

protected abstract String name()

protected abstract Duration pollInterval()
Expand Down
9 changes: 9 additions & 0 deletions src/main/groovy/io/seqera/wave/service/job/JobManager.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@ package io.seqera.wave.service.job

import java.time.Duration
import java.time.Instant
import java.util.concurrent.ExecutorService

import com.github.benmanes.caffeine.cache.AsyncCache
import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.context.annotation.Context
import io.micronaut.scheduling.TaskExecutors
import jakarta.annotation.PostConstruct
import jakarta.inject.Inject
import jakarta.inject.Named

/**
* Implement the logic to handle Blob cache transfer (uploads)
*
Expand All @@ -51,6 +55,10 @@ class JobManager {
@Inject
private JobConfig config

@Inject
@Named(TaskExecutors.BLOCKING)
private ExecutorService ioExecutor

// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncCache<String,Instant> debounceCache

Expand All @@ -60,6 +68,7 @@ class JobManager {
debounceCache = Caffeine
.newBuilder()
.expireAfterWrite(config.graceInterval.multipliedBy(2))
.executor(ioExecutor)
.buildAsync()
queue.addConsumer((job)-> processJob(job))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class BuildLogServiceImpl implements BuildLogService {

@Inject
@Named(TaskExecutors.IO)
private volatile ExecutorService ioExecutor
private ExecutorService ioExecutor

@PostConstruct
private void init() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
package io.seqera.wave.service.pairing.socket

import java.time.Duration
import java.util.concurrent.ExecutorService
import javax.annotation.PreDestroy

import groovy.transform.CompileStatic
import io.micronaut.context.annotation.Value
import io.micronaut.scheduling.TaskExecutors
import io.seqera.wave.service.data.queue.AbstractMessageQueue
import io.seqera.wave.service.data.queue.MessageQueue
import io.seqera.wave.service.pairing.socket.msg.PairingMessage
import jakarta.inject.Named
import jakarta.inject.Singleton
/**
* Implement a distributed queue for Wave pairing messages
Expand All @@ -40,9 +43,10 @@ class PairingOutboundQueue extends AbstractMessageQueue<PairingMessage> {

PairingOutboundQueue(
MessageQueue<String> broker,
@Value('${wave.pairing.channel.awaitTimeout:100ms}') Duration pollInterval
@Value('${wave.pairing.channel.awaitTimeout:100ms}') Duration pollInterval,
@Named(TaskExecutors.BLOCKING) ExecutorService ioExecutor
) {
super(broker)
super(broker, ioExecutor)
this.pollInterval = pollInterval
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import io.seqera.wave.tower.client.TowerClient
import io.seqera.wave.util.ExponentialAttempt
import io.seqera.wave.util.JacksonHelper
import io.seqera.wave.util.RegHelper
import jakarta.annotation.PostConstruct
import jakarta.inject.Inject
import jakarta.inject.Named
import static io.seqera.wave.util.LongRndKey.rndHex
Expand Down Expand Up @@ -92,10 +93,16 @@ abstract class TowerConnector {
}
}

private AsyncLoadingCache<JwtRefreshParams, CompletableFuture<JwtAuth>> refreshCache = Caffeine
.newBuilder()
.expireAfterWrite(1, TimeUnit.MINUTES)
.buildAsync(loader)
private AsyncLoadingCache<JwtRefreshParams, CompletableFuture<JwtAuth>> refreshCache

@PostConstruct
void init() {
refreshCache = Caffeine
.newBuilder()
.expireAfterWrite(1, TimeUnit.MINUTES)
.executor(ioExecutor)
.buildAsync(loader)
}

/** Only for testing - do not use */
Cache<JwtRefreshParams, CompletableFuture<JwtAuth>> refreshCache0() {
Expand Down
3 changes: 0 additions & 3 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ micronaut:
stream-executor:
type: FIXED
number-of-threads: 16
future-store-executor:
type : FIXED
number-of-threads : 32
netty:
event-loops:
stream-pool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,27 @@

package io.seqera.wave.service.aws


import spock.lang.Requires
import spock.lang.Specification


import io.micronaut.test.extensions.spock.annotation.MicronautTest
import jakarta.inject.Inject
/**
*
* @author Paolo Di Tommaso <[email protected]>
*/
@MicronautTest
class AwsEcrServiceTest extends Specification {

@Inject
AwsEcrService provider

@Requires({System.getenv('AWS_ACCESS_KEY_ID') && System.getenv('AWS_SECRET_ACCESS_KEY')})
def 'should get registry token' () {
given:
def accessKey = System.getenv('AWS_ACCESS_KEY_ID')
def secretKey = System.getenv('AWS_SECRET_ACCESS_KEY')
def REGION = 'eu-west-1'
def provider = new AwsEcrService()

when:
def creds = provider.getLoginToken(accessKey, secretKey, REGION, false).tokenize(":")
Expand All @@ -49,17 +52,15 @@ class AwsEcrServiceTest extends Specification {
thrown(Exception)
}

@Requires({System.getenv('AWS_ACCESS_KEY_ID') && System.getenv('AWS_SECRET_ACCESS_KEY')})
def 'should check registry info' () {
given:
def provider = new AwsEcrService()
expect:
provider.getEcrHostInfo(null) == null
provider.getEcrHostInfo('foo') == null
provider.getEcrHostInfo('195996028523.dkr.ecr.eu-west-1.amazonaws.com') == new AwsEcrService.AwsEcrHostInfo('195996028523', 'eu-west-1')
provider.getEcrHostInfo('195996028523.dkr.ecr.eu-west-1.amazonaws.com/foo') == new AwsEcrService.AwsEcrHostInfo('195996028523', 'eu-west-1')
and:
provider.getEcrHostInfo('public.ecr.aws') == new AwsEcrService.AwsEcrHostInfo(null, 'us-east-1')

}

def 'should check ecr registry' () {
Expand Down
Loading