diff --git a/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy b/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy index c0ee2e0d5..a4252c4f3 100644 --- a/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy @@ -22,6 +22,7 @@ import java.net.http.HttpRequest import java.net.http.HttpResponse import java.time.Duration import java.util.concurrent.CompletionException +import java.util.concurrent.ExecutorService import java.util.concurrent.TimeUnit import com.github.benmanes.caffeine.cache.AsyncLoadingCache @@ -33,6 +34,7 @@ import groovy.transform.CompileStatic import groovy.transform.PackageScope import groovy.transform.ToString import groovy.util.logging.Slf4j +import io.micronaut.scheduling.TaskExecutors import io.seqera.wave.configuration.HttpClientConfig import io.seqera.wave.exception.RegistryForwardException import io.seqera.wave.exception.RegistryUnauthorizedAccessException @@ -40,7 +42,9 @@ import io.seqera.wave.http.HttpClientFactory import io.seqera.wave.util.RegHelper import io.seqera.wave.util.Retryable import io.seqera.wave.util.StringUtils +import jakarta.annotation.PostConstruct import jakarta.inject.Inject +import jakarta.inject.Named import jakarta.inject.Singleton import static io.seqera.wave.WaveDefault.DOCKER_IO import static io.seqera.wave.auth.RegistryUtils.isServerError @@ -64,6 +68,10 @@ class RegistryAuthServiceImpl implements RegistryAuthService { @Inject private RegistryTokenStore tokenStore + @Inject + @Named(TaskExecutors.BLOCKING) + private ExecutorService ioExecutor + @Canonical @ToString(includePackage = false, includeNames = true) static private class CacheKey { @@ -101,16 +109,23 @@ class RegistryAuthServiceImpl implements RegistryAuthService { } // FIXME https://github.com/seqeralabs/wave/issues/747 - private AsyncLoadingCache cacheTokens = Caffeine - .newBuilder() - .maximumSize(10_000) - .expireAfterAccess(_1_HOUR.toMillis(), TimeUnit.MILLISECONDS) - .buildAsync(loader) + private AsyncLoadingCache cacheTokens @Inject private RegistryLookupService lookupService - @Inject RegistryCredentialsFactory credentialsFactory + @Inject + private RegistryCredentialsFactory credentialsFactory + + @PostConstruct + private void init() { + cacheTokens = Caffeine + .newBuilder() + .maximumSize(10_000) + .expireAfterAccess(_1_HOUR.toMillis(), TimeUnit.MILLISECONDS) + .executor(ioExecutor) + .buildAsync(loader) + } /** * Implements container registry login diff --git a/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy b/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy index 7f9db5f3f..13775d4a4 100644 --- a/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy @@ -21,6 +21,7 @@ package io.seqera.wave.auth import java.net.http.HttpRequest import java.net.http.HttpResponse import java.util.concurrent.CompletionException +import java.util.concurrent.ExecutorService import java.util.concurrent.TimeUnit import com.github.benmanes.caffeine.cache.AsyncLoadingCache @@ -28,11 +29,14 @@ import com.github.benmanes.caffeine.cache.CacheLoader import com.github.benmanes.caffeine.cache.Caffeine import groovy.transform.CompileStatic import groovy.util.logging.Slf4j +import io.micronaut.scheduling.TaskExecutors import io.seqera.wave.configuration.HttpClientConfig import io.seqera.wave.exception.RegistryForwardException import io.seqera.wave.http.HttpClientFactory import io.seqera.wave.util.Retryable +import jakarta.annotation.PostConstruct import jakarta.inject.Inject +import jakarta.inject.Named import jakarta.inject.Singleton import static io.seqera.wave.WaveDefault.DOCKER_IO import static io.seqera.wave.WaveDefault.DOCKER_REGISTRY_1 @@ -55,6 +59,10 @@ class RegistryLookupServiceImpl implements RegistryLookupService { @Inject private RegistryAuthStore store + @Inject + @Named(TaskExecutors.BLOCKING) + private ExecutorService ioExecutor + private CacheLoader loader = new CacheLoader() { @Override RegistryAuth load(URI endpoint) throws Exception { @@ -74,11 +82,17 @@ class RegistryLookupServiceImpl implements RegistryLookupService { } // FIXME https://github.com/seqeralabs/wave/issues/747 - private AsyncLoadingCache cache = Caffeine + private AsyncLoadingCache cache + + @PostConstruct + void init() { + cache = Caffeine .newBuilder() .maximumSize(10_000) .expireAfterAccess(1, TimeUnit.HOURS) + .executor(ioExecutor) .buildAsync(loader) + } protected RegistryAuth lookup0(URI endpoint) { final httpClient = HttpClientFactory.followRedirectsHttpClient() diff --git a/src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy b/src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy index c32563675..3db479f4b 100644 --- a/src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy +++ b/src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy @@ -18,6 +18,7 @@ package io.seqera.wave.service.aws +import java.util.concurrent.ExecutorService import java.util.concurrent.TimeUnit import java.util.regex.Pattern @@ -27,7 +28,11 @@ import com.github.benmanes.caffeine.cache.Caffeine import groovy.transform.Canonical import groovy.transform.CompileStatic import groovy.util.logging.Slf4j +import io.micronaut.scheduling.TaskExecutors import io.seqera.wave.util.StringUtils +import jakarta.annotation.PostConstruct +import jakarta.inject.Inject +import jakarta.inject.Named import jakarta.inject.Singleton import software.amazon.awssdk.auth.credentials.AwsBasicCredentials import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider @@ -73,13 +78,22 @@ class AwsEcrService { } } - // FIXME https://github.com/seqeralabs/wave/issues/747 - private AsyncLoadingCache cache = Caffeine - .newBuilder() - .maximumSize(10_000) - .expireAfterWrite(3, TimeUnit.HOURS) - .buildAsync(loader) + @Inject + @Named(TaskExecutors.BLOCKING) + private ExecutorService ioExecutor + // FIXME https://github.com/seqeralabs/wave/issues/747 + private AsyncLoadingCache cache + + @PostConstruct + private void init() { + cache = Caffeine + .newBuilder() + .maximumSize(10_000) + .expireAfterWrite(3, TimeUnit.HOURS) + .executor(ioExecutor) + .buildAsync(loader) + } private EcrClient ecrClient(String accessKey, String secretKey, String region) { EcrClient.builder() diff --git a/src/main/groovy/io/seqera/wave/service/data/future/AbstractFutureStore.groovy b/src/main/groovy/io/seqera/wave/service/data/future/AbstractFutureStore.groovy index 130efe56d..f81dd27fa 100644 --- a/src/main/groovy/io/seqera/wave/service/data/future/AbstractFutureStore.groovy +++ b/src/main/groovy/io/seqera/wave/service/data/future/AbstractFutureStore.groovy @@ -26,6 +26,7 @@ import java.util.concurrent.TimeoutException import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import io.micronaut.context.annotation.Value +import io.micronaut.scheduling.TaskExecutors import io.seqera.wave.encoder.EncodingStrategy import jakarta.inject.Inject import jakarta.inject.Named @@ -55,7 +56,7 @@ abstract class AbstractFutureStore implements FutureStore { private volatile Duration pollInterval @Inject - @Named('future-store-executor') + @Named(TaskExecutors.BLOCKING) private ExecutorService executor AbstractFutureStore(FutureHash store, EncodingStrategy encodingStrategy) { diff --git a/src/main/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueue.groovy b/src/main/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueue.groovy index 1d89504d3..feb9f2508 100644 --- a/src/main/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueue.groovy +++ b/src/main/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueue.groovy @@ -20,6 +20,7 @@ package io.seqera.wave.service.data.queue import java.time.Duration import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ExecutorService import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger @@ -61,21 +62,27 @@ abstract class AbstractMessageQueue implements Runnable { final private String name0 // FIXME https://github.com/seqeralabs/wave/issues/747 - final private AsyncCache closedClients = Caffeine - .newBuilder() - .expireAfterWrite(10, TimeUnit.MINUTES) - .buildAsync() + final private AsyncCache closedClients - AbstractMessageQueue(MessageQueue broker) { + AbstractMessageQueue(MessageQueue broker, ExecutorService ioExecutor) { final type = TypeHelper.getGenericType(this, 0) this.encoder = new MoshiEncodeStrategy(type) {} this.broker = broker + this.closedClients = createCache(ioExecutor) this.name0 = name() + '-thread-' + count.getAndIncrement() this.thread = new Thread(this, name0) this.thread.setDaemon(true) this.thread.start() } + private AsyncCache createCache(ExecutorService ioExecutor) { + Caffeine + .newBuilder() + .executor(ioExecutor) + .expireAfterWrite(10, TimeUnit.MINUTES) + .buildAsync() + } + protected abstract String name() protected abstract Duration pollInterval() diff --git a/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy b/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy index e10cde8d4..3b6f7cce2 100644 --- a/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy +++ b/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy @@ -20,6 +20,7 @@ package io.seqera.wave.service.job import java.time.Duration import java.time.Instant +import java.util.concurrent.ExecutorService import com.github.benmanes.caffeine.cache.AsyncCache import com.github.benmanes.caffeine.cache.Cache @@ -27,8 +28,11 @@ import com.github.benmanes.caffeine.cache.Caffeine import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import io.micronaut.context.annotation.Context +import io.micronaut.scheduling.TaskExecutors import jakarta.annotation.PostConstruct import jakarta.inject.Inject +import jakarta.inject.Named + /** * Implement the logic to handle Blob cache transfer (uploads) * @@ -51,6 +55,10 @@ class JobManager { @Inject private JobConfig config + @Inject + @Named(TaskExecutors.BLOCKING) + private ExecutorService ioExecutor + // FIXME https://github.com/seqeralabs/wave/issues/747 private AsyncCache debounceCache @@ -60,6 +68,7 @@ class JobManager { debounceCache = Caffeine .newBuilder() .expireAfterWrite(config.graceInterval.multipliedBy(2)) + .executor(ioExecutor) .buildAsync() queue.addConsumer((job)-> processJob(job)) } diff --git a/src/main/groovy/io/seqera/wave/service/logs/BuildLogServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/logs/BuildLogServiceImpl.groovy index 8be138dd1..5fadd46c9 100644 --- a/src/main/groovy/io/seqera/wave/service/logs/BuildLogServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/service/logs/BuildLogServiceImpl.groovy @@ -80,7 +80,7 @@ class BuildLogServiceImpl implements BuildLogService { @Inject @Named(TaskExecutors.IO) - private volatile ExecutorService ioExecutor + private ExecutorService ioExecutor @PostConstruct private void init() { diff --git a/src/main/groovy/io/seqera/wave/service/pairing/socket/PairingOutboundQueue.groovy b/src/main/groovy/io/seqera/wave/service/pairing/socket/PairingOutboundQueue.groovy index 889af4871..61518a451 100644 --- a/src/main/groovy/io/seqera/wave/service/pairing/socket/PairingOutboundQueue.groovy +++ b/src/main/groovy/io/seqera/wave/service/pairing/socket/PairingOutboundQueue.groovy @@ -19,13 +19,16 @@ package io.seqera.wave.service.pairing.socket import java.time.Duration +import java.util.concurrent.ExecutorService import javax.annotation.PreDestroy import groovy.transform.CompileStatic import io.micronaut.context.annotation.Value +import io.micronaut.scheduling.TaskExecutors import io.seqera.wave.service.data.queue.AbstractMessageQueue import io.seqera.wave.service.data.queue.MessageQueue import io.seqera.wave.service.pairing.socket.msg.PairingMessage +import jakarta.inject.Named import jakarta.inject.Singleton /** * Implement a distributed queue for Wave pairing messages @@ -40,9 +43,10 @@ class PairingOutboundQueue extends AbstractMessageQueue { PairingOutboundQueue( MessageQueue broker, - @Value('${wave.pairing.channel.awaitTimeout:100ms}') Duration pollInterval + @Value('${wave.pairing.channel.awaitTimeout:100ms}') Duration pollInterval, + @Named(TaskExecutors.BLOCKING) ExecutorService ioExecutor ) { - super(broker) + super(broker, ioExecutor) this.pollInterval = pollInterval } diff --git a/src/main/groovy/io/seqera/wave/tower/client/connector/TowerConnector.groovy b/src/main/groovy/io/seqera/wave/tower/client/connector/TowerConnector.groovy index 27e12d87f..666c8325e 100644 --- a/src/main/groovy/io/seqera/wave/tower/client/connector/TowerConnector.groovy +++ b/src/main/groovy/io/seqera/wave/tower/client/connector/TowerConnector.groovy @@ -48,6 +48,7 @@ import io.seqera.wave.tower.client.TowerClient import io.seqera.wave.util.ExponentialAttempt import io.seqera.wave.util.JacksonHelper import io.seqera.wave.util.RegHelper +import jakarta.annotation.PostConstruct import jakarta.inject.Inject import jakarta.inject.Named import static io.seqera.wave.util.LongRndKey.rndHex @@ -92,10 +93,16 @@ abstract class TowerConnector { } } - private AsyncLoadingCache> refreshCache = Caffeine - .newBuilder() - .expireAfterWrite(1, TimeUnit.MINUTES) - .buildAsync(loader) + private AsyncLoadingCache> refreshCache + + @PostConstruct + void init() { + refreshCache = Caffeine + .newBuilder() + .expireAfterWrite(1, TimeUnit.MINUTES) + .executor(ioExecutor) + .buildAsync(loader) + } /** Only for testing - do not use */ Cache> refreshCache0() { diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index bd030f589..4c7e565b8 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -41,9 +41,6 @@ micronaut: stream-executor: type: FIXED number-of-threads: 16 - future-store-executor: - type : FIXED - number-of-threads : 32 netty: event-loops: stream-pool: diff --git a/src/test/groovy/io/seqera/wave/service/aws/AwsEcrServiceTest.groovy b/src/test/groovy/io/seqera/wave/service/aws/AwsEcrServiceTest.groovy index 20ce856ab..6f51810dd 100644 --- a/src/test/groovy/io/seqera/wave/service/aws/AwsEcrServiceTest.groovy +++ b/src/test/groovy/io/seqera/wave/service/aws/AwsEcrServiceTest.groovy @@ -18,24 +18,27 @@ package io.seqera.wave.service.aws - import spock.lang.Requires import spock.lang.Specification - +import io.micronaut.test.extensions.spock.annotation.MicronautTest +import jakarta.inject.Inject /** * * @author Paolo Di Tommaso */ +@MicronautTest class AwsEcrServiceTest extends Specification { + @Inject + AwsEcrService provider + @Requires({System.getenv('AWS_ACCESS_KEY_ID') && System.getenv('AWS_SECRET_ACCESS_KEY')}) def 'should get registry token' () { given: def accessKey = System.getenv('AWS_ACCESS_KEY_ID') def secretKey = System.getenv('AWS_SECRET_ACCESS_KEY') def REGION = 'eu-west-1' - def provider = new AwsEcrService() when: def creds = provider.getLoginToken(accessKey, secretKey, REGION, false).tokenize(":") @@ -49,9 +52,8 @@ class AwsEcrServiceTest extends Specification { thrown(Exception) } + @Requires({System.getenv('AWS_ACCESS_KEY_ID') && System.getenv('AWS_SECRET_ACCESS_KEY')}) def 'should check registry info' () { - given: - def provider = new AwsEcrService() expect: provider.getEcrHostInfo(null) == null provider.getEcrHostInfo('foo') == null @@ -59,7 +61,6 @@ class AwsEcrServiceTest extends Specification { provider.getEcrHostInfo('195996028523.dkr.ecr.eu-west-1.amazonaws.com/foo') == new AwsEcrService.AwsEcrHostInfo('195996028523', 'eu-west-1') and: provider.getEcrHostInfo('public.ecr.aws') == new AwsEcrService.AwsEcrHostInfo(null, 'us-east-1') - } def 'should check ecr registry' () { diff --git a/src/test/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueueLocalTest.groovy b/src/test/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueueLocalTest.groovy index 8923f6cf3..11a0fac6a 100644 --- a/src/test/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueueLocalTest.groovy +++ b/src/test/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueueLocalTest.groovy @@ -22,13 +22,17 @@ import spock.lang.Specification import java.time.Duration import java.util.concurrent.CompletableFuture +import java.util.concurrent.ExecutorService import java.util.concurrent.TimeUnit +import io.micronaut.scheduling.TaskExecutors import io.micronaut.test.extensions.spock.annotation.MicronautTest import io.seqera.wave.service.pairing.socket.PairingOutboundQueue import io.seqera.wave.service.pairing.socket.msg.PairingHeartbeat import io.seqera.wave.service.pairing.socket.msg.PairingMessage import jakarta.inject.Inject +import jakarta.inject.Named + /** * Test class {@link AbstractMessageQueue} using a {@link io.seqera.wave.service.data.queue.impl.LocalMessageQueue} * @@ -40,9 +44,12 @@ class AbstractMessageQueueLocalTest extends Specification { @Inject private MessageQueue broker + @Named(TaskExecutors.BLOCKING) + private ExecutorService ioExecutor + def 'should send and consume a request'() { given: - def queue = new PairingOutboundQueue(broker, Duration.ofMillis(100)) + def queue = new PairingOutboundQueue(broker, Duration.ofMillis(100), ioExecutor) when: def result = new CompletableFuture() @@ -58,7 +65,7 @@ class AbstractMessageQueueLocalTest extends Specification { def 'should validate '() { given: - def queue = new PairingOutboundQueue(broker, Duration.ofMillis(100)) + def queue = new PairingOutboundQueue(broker, Duration.ofMillis(100), ioExecutor) expect: queue.targetKey('foo') == 'pairing-outbound-queue/v1:foo' @@ -68,6 +75,4 @@ class AbstractMessageQueueLocalTest extends Specification { queue.close() } - - } diff --git a/src/test/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueueRedisTest.groovy b/src/test/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueueRedisTest.groovy index ad77e1d11..0353fa8bb 100644 --- a/src/test/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueueRedisTest.groovy +++ b/src/test/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueueRedisTest.groovy @@ -23,6 +23,7 @@ import spock.lang.Specification import java.time.Duration import java.util.concurrent.CompletableFuture +import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import io.micronaut.context.ApplicationContext @@ -54,8 +55,9 @@ class AbstractMessageQueueRedisTest extends Specification implements RedisTestCo def 'should send and consume a request'() { given: + def executor = Executors.newCachedThreadPool() def broker = context.getBean(RedisMessageQueue) - def queue = new PairingOutboundQueue(broker, Duration.ofMillis(100)) + def queue = new PairingOutboundQueue(broker, Duration.ofMillis(100), executor) and: def result = new CompletableFuture() when: @@ -72,11 +74,12 @@ class AbstractMessageQueueRedisTest extends Specification implements RedisTestCo def 'should send and consume a request across instances'() { given: + def executor = Executors.newCachedThreadPool() def broker1 = context.getBean(RedisMessageQueue) - def queue1 = new PairingOutboundQueue(broker1, Duration.ofMillis(100)) + def queue1 = new PairingOutboundQueue(broker1, Duration.ofMillis(100), executor) and: def broker2 = context.getBean(RedisMessageQueue) - def queue2 = new PairingOutboundQueue(broker2, Duration.ofMillis(100)) + def queue2 = new PairingOutboundQueue(broker2, Duration.ofMillis(100), executor) and: def result = new CompletableFuture()