Skip to content

Commit

Permalink
Migration to virtual threads - phase 1 (#746)
Browse files Browse the repository at this point in the history

Signed-off-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
pditommaso authored Nov 18, 2024
1 parent cf813e0 commit aaf0420
Show file tree
Hide file tree
Showing 24 changed files with 94 additions and 64 deletions.
3 changes: 1 addition & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ jib {

run{
def envs = findProperty('micronautEnvs')
// note: "--enable-preview" is required to use virtual threads on Java 19 and 20
def args = ["-Dmicronaut.environments=$envs","--enable-preview"]
def args = ["-Dmicronaut.environments=$envs","-Djdk.tracePinnedThreads=short"]
if( environment['JVM_OPTS'] ) args.add(environment['JVM_OPTS'])
jvmArgs args
systemProperties 'DOCKER_USER': project.findProperty('DOCKER_USER') ?: environment['DOCKER_USER'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import java.time.Duration
import java.util.concurrent.CompletionException
import java.util.concurrent.TimeUnit

import com.github.benmanes.caffeine.cache.AsyncLoadingCache
import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.LoadingCache
import groovy.json.JsonSlurper
import groovy.transform.Canonical
import groovy.transform.CompileStatic
Expand Down Expand Up @@ -100,11 +100,12 @@ class RegistryAuthServiceImpl implements RegistryAuthService {
return result
}

private LoadingCache<CacheKey, String> cacheTokens = Caffeine.newBuilder()
// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncLoadingCache<CacheKey, String> cacheTokens = Caffeine.newBuilder()
.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(_1_HOUR.toMillis(), TimeUnit.MILLISECONDS)
.build(loader)
.buildAsync(loader)

@Inject
private RegistryLookupService lookupService
Expand Down Expand Up @@ -268,7 +269,8 @@ class RegistryAuthServiceImpl implements RegistryAuthService {
protected String getAuthToken(String image, RegistryAuth auth, RegistryCredentials creds) {
final key = new CacheKey(image, auth, creds)
try {
return cacheTokens.get(key)
// FIXME https://github.com/seqeralabs/wave/issues/747
return cacheTokens.synchronous().get(key)
}
catch (CompletionException e) {
// this catches the exception thrown in the cache loader lookup
Expand All @@ -286,7 +288,8 @@ class RegistryAuthServiceImpl implements RegistryAuthService {
*/
void invalidateAuthorization(String image, RegistryAuth auth, RegistryCredentials creds) {
final key = new CacheKey(image, auth, creds)
cacheTokens.invalidate(key)
// FIXME https://github.com/seqeralabs/wave/issues/747
cacheTokens.synchronous().invalidate(key)
tokenStore.remove(getStableKey(key))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import java.net.http.HttpResponse
import java.util.concurrent.CompletionException
import java.util.concurrent.TimeUnit

import com.github.benmanes.caffeine.cache.AsyncLoadingCache
import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.LoadingCache
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.seqera.wave.configuration.HttpClientConfig
Expand Down Expand Up @@ -73,11 +73,12 @@ class RegistryLookupServiceImpl implements RegistryLookupService {
}
}

private LoadingCache<URI, RegistryAuth> cache = Caffeine.newBuilder()
// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncLoadingCache<URI, RegistryAuth> cache = Caffeine.newBuilder()
.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(1, TimeUnit.HOURS)
.build(loader)
.buildAsync(loader)

protected RegistryAuth lookup0(URI endpoint) {
final httpClient = HttpClientFactory.followRedirectsHttpClient()
Expand Down Expand Up @@ -116,7 +117,8 @@ class RegistryLookupServiceImpl implements RegistryLookupService {
RegistryInfo lookup(String registry) {
try {
final endpoint = registryEndpoint(registry)
final auth = cache.get(endpoint)
// FIXME https://github.com/seqeralabs/wave/issues/747
final auth = cache.synchronous().get(endpoint)
return new RegistryInfo(registry, endpoint, auth)
}
catch (CompletionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import jakarta.inject.Inject
@Slf4j
@CompileStatic
@Controller("/")
@ExecuteOn(TaskExecutors.IO)
@ExecuteOn(TaskExecutors.BLOCKING)
class BuildController {

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture
@Slf4j
@CompileStatic
@Controller("/")
@ExecuteOn(TaskExecutors.IO)
@ExecuteOn(TaskExecutors.BLOCKING)
class ContainerController {

@Inject
Expand Down Expand Up @@ -181,13 +181,13 @@ class ContainerController {

@Deprecated
@Post('/container-token')
@ExecuteOn(TaskExecutors.IO)
@ExecuteOn(TaskExecutors.BLOCKING)
CompletableFuture<HttpResponse<SubmitContainerTokenResponse>> getToken(HttpRequest httpRequest, @Body SubmitContainerTokenRequest req) {
return getContainerImpl(httpRequest, req, false)
}

@Post('/v1alpha2/container')
@ExecuteOn(TaskExecutors.IO)
@ExecuteOn(TaskExecutors.BLOCKING)
CompletableFuture<HttpResponse<SubmitContainerTokenResponse>> getTokenV2(HttpRequest httpRequest, @Body SubmitContainerTokenRequest req) {
return getContainerImpl(httpRequest, req, true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import static io.seqera.wave.util.ContainerHelper.patchPlatformEndpoint
@Slf4j
@CompileStatic
@Controller("/")
@ExecuteOn(TaskExecutors.IO)
@ExecuteOn(TaskExecutors.BLOCKING)
class InspectController {

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import static io.micronaut.http.HttpHeaders.WWW_AUTHENTICATE
@Requires(property = 'wave.metrics.enabled', value = 'true')
@Secured(SecurityRule.IS_AUTHENTICATED)
@Controller
@ExecuteOn(TaskExecutors.IO)
@ExecuteOn(TaskExecutors.BLOCKING)
class MetricsController {

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import jakarta.inject.Inject
@Slf4j
@CompileStatic
@Controller("/")
@ExecuteOn(TaskExecutors.IO)
@ExecuteOn(TaskExecutors.BLOCKING)
class MirrorController {

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ import reactor.core.publisher.Mono
@Slf4j
@CompileStatic
@Controller("/v2")
@ExecuteOn(TaskExecutors.IO)
@ExecuteOn(TaskExecutors.BLOCKING)
class RegistryProxyController {

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import jakarta.inject.Inject
@CompileStatic
@Requires(bean = ContainerScanService)
@Controller("/")
@ExecuteOn(TaskExecutors.IO)
@ExecuteOn(TaskExecutors.BLOCKING)
class ScanController {

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import io.seqera.wave.util.BuildInfo
@Slf4j
@Controller("/")
@CompileStatic
@ExecuteOn(TaskExecutors.IO)
@ExecuteOn(TaskExecutors.BLOCKING)
class ServiceInfoController {

@Value('${wave.landing.url}')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import io.seqera.wave.auth.RegistryAuthService
import jakarta.inject.Inject
import jakarta.validation.Valid

@ExecuteOn(TaskExecutors.IO)
@Controller("/validate-creds")
@ExecuteOn(TaskExecutors.BLOCKING)
class ValidateController {

@Inject RegistryAuthService loginService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ import static io.seqera.wave.util.DataTimeUtils.formatTimestamp
@Slf4j
@CompileStatic
@Controller("/view")
@ExecuteOn(TaskExecutors.IO)
@ExecuteOn(TaskExecutors.BLOCKING)
class ViewController {

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ class ContainerAugmenter {
return result
}

synchronized protected Map layerBlob(String image, ContainerLayer layer) {
protected Map layerBlob(String image, ContainerLayer layer) {
log.debug "Adding layer: $layer to image: $client.registry.name/$image"
// store the layer blob in the cache
final String path = "$client.registry.name/v2/$image/blobs/$layer.gzipDigest"
Expand All @@ -295,7 +295,6 @@ class ContainerAugmenter {


protected Tuple2<String,Integer> updateImageManifest(String imageName, String imageManifest, String newImageConfigDigest, newImageConfigSize, boolean oci) {

// turn the json string into a json map
// and append the new layer
final manifest = (Map) new JsonSlurper().parseText(imageManifest)
Expand Down
13 changes: 11 additions & 2 deletions src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package io.seqera.wave.core

import java.util.concurrent.CompletableFuture

import groovy.transform.CompileStatic
import groovy.transform.ToString
import groovy.util.logging.Slf4j
Expand Down Expand Up @@ -193,7 +195,7 @@ class RegistryProxyService {

String getImageDigest(String containerImage, PlatformId identity, boolean retryOnNotFound=false) {
try {
return getImageDigest0(containerImage, identity, retryOnNotFound)
return getImageDigest0(containerImage, identity, retryOnNotFound).get()
}
catch(Exception e) {
log.warn "Unable to retrieve digest for image '${containerImage}' -- cause: ${e.message}"
Expand All @@ -203,8 +205,15 @@ class RegistryProxyService {

static private List<Integer> RETRY_ON_NOT_FOUND = HTTP_RETRYABLE_ERRORS + 404

// note: return a CompletableFuture to force micronaut to use caffeine AsyncCache
// that provides a workaround about the use of virtual threads with SyncCache
// see https://github.com/ben-manes/caffeine/issues/1468#issuecomment-1906733926
@Cacheable(value = 'cache-registry-proxy', atomic = true, parameters = ['image'])
protected String getImageDigest0(String image, PlatformId identity, boolean retryOnNotFound) {
protected CompletableFuture<String> getImageDigest0(String image, PlatformId identity, boolean retryOnNotFound) {
CompletableFuture.completedFuture(getImageDigest1(image, identity, retryOnNotFound))
}

protected String getImageDigest1(String image, PlatformId identity, boolean retryOnNotFound) {
final coords = ContainerCoordinates.parse(image)
final route = RoutePath.v2manifestPath(coords, identity)
final proxyClient = client(route)
Expand Down
15 changes: 11 additions & 4 deletions src/main/groovy/io/seqera/wave/http/HttpClientFactory.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.net.http.HttpClient
import java.time.Duration
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.locks.ReentrantLock

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
Expand All @@ -39,9 +40,9 @@ class HttpClientFactory {

static private Duration timeout = Duration.ofSeconds(20)

static private final Object l1 = new Object()
static private final ReentrantLock l1 = new ReentrantLock()

static private final Object l2 = new Object()
static private final ReentrantLock l2 = new ReentrantLock()

private static HttpClient client1

Expand All @@ -51,20 +52,26 @@ class HttpClientFactory {
static HttpClient followRedirectsHttpClient() {
if( client1!=null )
return client1
synchronized (l1) {
l1.lock()
try {
if( client1!=null )
return client1
return client1=followRedirectsHttpClient0()
} finally {
l1.unlock()
}
}

static HttpClient neverRedirectsHttpClient() {
if( client2!=null )
return client2
synchronized (l2) {
l2.lock()
try {
if( client2!=null )
return client2
return client2=neverRedirectsHttpClient0()
} finally {
l2.unlock()
}
}

Expand Down
10 changes: 6 additions & 4 deletions src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ package io.seqera.wave.service.aws
import java.util.concurrent.TimeUnit
import java.util.regex.Pattern

import com.github.benmanes.caffeine.cache.AsyncLoadingCache
import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.LoadingCache
import groovy.transform.Canonical
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
Expand Down Expand Up @@ -73,11 +73,12 @@ class AwsEcrService {
}
}

private LoadingCache<AwsCreds, String> cache = Caffeine.newBuilder()
// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncLoadingCache<AwsCreds, String> cache = Caffeine.newBuilder()
.newBuilder()
.maximumSize(10_000)
.expireAfterWrite(3, TimeUnit.HOURS)
.build(loader)
.buildAsync(loader)


private EcrClient ecrClient(String accessKey, String secretKey, String region) {
Expand Down Expand Up @@ -126,7 +127,8 @@ class AwsEcrService {
try {
// get the token from the cache, if missing the it's automatically
// fetch using the AWS ECR client
return cache.get(new AwsCreds(accessKey,secretKey,region,isPublic))
// FIXME https://github.com/seqeralabs/wave/issues/747
return cache.synchronous().get(new AwsCreds(accessKey,secretKey,region,isPublic))
}
catch (Exception e) {
final type = isPublic ? "ECR public" : "ECR"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.AsyncCache
import com.github.benmanes.caffeine.cache.Caffeine
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
Expand Down Expand Up @@ -60,10 +60,11 @@ abstract class AbstractMessageQueue<M> implements Runnable {

final private String name0

final private Cache<String,Boolean> closedClients = Caffeine.newBuilder()
// FIXME https://github.com/seqeralabs/wave/issues/747
final private AsyncCache<String,Boolean> closedClients = Caffeine.newBuilder()
.newBuilder()
.expireAfterWrite(10, TimeUnit.MINUTES)
.build()
.buildAsync()

AbstractMessageQueue(MessageQueue<String> broker) {
final type = TypeHelper.getGenericType(this, 0)
Expand Down Expand Up @@ -149,13 +150,15 @@ abstract class AbstractMessageQueue<M> implements Runnable {

@Override
void run() {
// FIXME https://github.com/seqeralabs/wave/issues/747
final clientsCache = closedClients.synchronous()
while( !thread.isInterrupted() ) {
try {
int sent=0
final clients = new HashMap<String,MessageSender<String>>(this.clients)
for( Map.Entry<String,MessageSender<String>> entry : clients ) {
// ignore clients marked as closed
if( closedClients.getIfPresent(entry.key))
if( clientsCache.getIfPresent(entry.key))
continue
// infer the target queue from the client key
final target = targetFromClientKey(entry.key)
Expand All @@ -173,7 +176,7 @@ abstract class AbstractMessageQueue<M> implements Runnable {
// offer back the value to be processed again
broker.offer(target, value)
if( e.message?.contains('close') ) {
closedClients.put(entry.key, true)
clientsCache.put(entry.key, true)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ abstract class AbstractMessageStream<M> implements Closeable {
* The {@link Predicate} to be invoked when a stream message is consumed (read from) the stream.
*/
void addConsumer(String streamId, MessageConsumer<M> consumer) {
// the use of synchronized block is meant to prevent a race condition while
// updating the 'listeners' from concurrent invocations.
// however, considering the addConsumer is invoked during the initialization phase
// (and therefore in the same thread) in should not be really needed.
synchronized (listeners) {
if( listeners.containsKey(streamId))
throw new IllegalStateException("Only one consumer can be defined for each stream - offending streamId=$streamId; consumer=$consumer")
Expand Down
Loading

0 comments on commit aaf0420

Please sign in to comment.