From d47e8b0768befe37190a0dac4db4bcd4119afa76 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Sat, 7 Oct 2023 10:53:56 +0200 Subject: [PATCH] Improve Wave error handling Signed-off-by: Paolo Di Tommaso --- .../io/seqera/wave/plugin/WaveClient.groovy | 40 ++++++++++-------- .../seqera/wave/plugin/WaveClientTest.groovy | 42 +++++++++++++++++++ 2 files changed, 64 insertions(+), 18 deletions(-) diff --git a/plugins/nf-wave/src/main/io/seqera/wave/plugin/WaveClient.groovy b/plugins/nf-wave/src/main/io/seqera/wave/plugin/WaveClient.groovy index a5909daf3a..891b4f5a0f 100644 --- a/plugins/nf-wave/src/main/io/seqera/wave/plugin/WaveClient.groovy +++ b/plugins/nf-wave/src/main/io/seqera/wave/plugin/WaveClient.groovy @@ -306,8 +306,9 @@ class WaveClient { final resp = httpSend(req) final code = resp.statusCode() + final body = resp.body() if( code>=200 && code<400 ) { - log.debug "Wave container config response: [$code] ${resp.body()}" + log.debug "Wave container config response: [$code] ${body}" return jsonToContainerConfig(resp.body()) } throw new BadResponseException("Unexpected response for containerContainerConfigUrl \'$configUrl\': [${resp.statusCode()}] ${resp.body()}") @@ -551,12 +552,13 @@ class WaveClient { .build() final begin = System.currentTimeMillis() final resp = httpSend(req) + final body = resp.body() final code = resp.statusCode() if( code>=200 && code<400 ) { - log.debug "Wave container available in ${nextflow.util.Duration.of(System.currentTimeMillis()-begin)}: [$code] ${resp.body()}" + log.debug "Wave container available in ${nextflow.util.Duration.of(System.currentTimeMillis()-begin)}: [$code] ${body}" } else - throw new BadResponseException("Unexpected response for \'$manifest\': [${resp.statusCode()}] ${resp.body()}") + throw new BadResponseException("Unexpected response for \'$manifest\': [${code}] ${body}") } static protected boolean isCondaLocalFile(String value) { @@ -587,7 +589,9 @@ class WaveClient { .build() final resp = httpSend(req) - log.debug "Refresh cookie response: [${resp.statusCode()}] ${resp.body()}" + final code = resp.statusCode() + final body = resp.body() + log.debug "Refresh cookie response: [${code}] ${body}" if( resp.statusCode() != 200 ) return false @@ -623,16 +627,22 @@ class WaveClient { return null } - protected RetryPolicy retryPolicy(Predicate cond) { + protected RetryPolicy retryPolicy(Predicate cond, Predicate handle) { final cfg = config.retryOpts() final listener = new EventListener>() { @Override void accept(ExecutionAttemptedEvent event) throws Throwable { - log.debug("Wave connection failure - attempt: ${event.attemptCount}", event.lastFailure) + def msg = "Wave connection failure - attempt: ${event.attemptCount}" + if( event.lastResult!=null ) + msg += "; response: ${event.lastResult}" + if( event.lastFailure != null ) + msg += "; exception: [${event.lastFailure.class.name}] ${event.lastFailure.message}" + log.debug(msg) } } return RetryPolicy.builder() .handleIf(cond) + .handleResultIf(handle) .withBackoff(cfg.delay.toMillis(), cfg.maxDelay.toMillis(), ChronoUnit.MILLIS) .withMaxAttempts(cfg.maxAttempts) .withJitter(cfg.jitter) @@ -640,22 +650,16 @@ class WaveClient { .build() } - protected T safeApply(CheckedSupplier action) { - final cond = (e -> e instanceof IOException) as Predicate - final policy = retryPolicy(cond) + protected HttpResponse safeApply(CheckedSupplier action) { + final retryOnException = (e -> e instanceof IOException) as Predicate + final retryOnStatusCode = ((HttpResponse resp) -> resp.statusCode() in SERVER_ERRORS) as Predicate> + final policy = retryPolicy(retryOnException, retryOnStatusCode) return Failsafe.with(policy).get(action) } - static private final List SERVER_ERRORS = [429,502,503,504] + static private final List SERVER_ERRORS = [429,500,502,503,504] protected HttpResponse httpSend(HttpRequest req) { - return safeApply(() -> { - final resp=httpClient.send(req, HttpResponse.BodyHandlers.ofString()) - if( resp.statusCode() in SERVER_ERRORS) { - // throws an IOException so that the condition is handled by the retry policy - throw new IOException("Unexpected server response code ${resp.statusCode()} - message: ${resp.body()}") - } - return resp - }) + return safeApply(() -> httpClient.send(req, HttpResponse.BodyHandlers.ofString())) } } diff --git a/plugins/nf-wave/src/test/io/seqera/wave/plugin/WaveClientTest.groovy b/plugins/nf-wave/src/test/io/seqera/wave/plugin/WaveClientTest.groovy index 7bcc9fd1da..36108f236f 100644 --- a/plugins/nf-wave/src/test/io/seqera/wave/plugin/WaveClientTest.groovy +++ b/plugins/nf-wave/src/test/io/seqera/wave/plugin/WaveClientTest.groovy @@ -19,6 +19,7 @@ package io.seqera.wave.plugin import static java.nio.file.StandardOpenOption.* +import java.net.http.HttpRequest import java.nio.file.Files import java.nio.file.Path import java.nio.file.attribute.FileTime @@ -1219,4 +1220,45 @@ class WaveClientTest extends Specification { 'https://foo.com' | true } + def 'should retry http request' () { + + given: + int requestCount=0 + HttpHandler handler = { HttpExchange exchange -> + if( ++requestCount<3 ) { + exchange.getResponseHeaders().add("Content-Type", "text/plain") + exchange.sendResponseHeaders(503, 0) + exchange.getResponseBody().close() + } + else { + def body = 'Hello world!' + exchange.getResponseHeaders().add("Content-Type", "text/plain") + exchange.sendResponseHeaders(200, body.size()) + exchange.getResponseBody().write(body.bytes) + exchange.getResponseBody().close() + } + } + + HttpServer server = HttpServer.create(new InetSocketAddress(9901), 0); + server.createContext("/", handler); + server.start() + + def session = Mock(Session) {getConfig() >> [:] } + def client = new WaveClient(session) + + when: + def request = HttpRequest.newBuilder().uri(new URI('http://localhost:9901/foo.txt')).build() + def response = client.httpSend(request) + then: + response.statusCode() == 200 + response.body() == 'Hello world!' + and: + requestCount == 3 + + cleanup: + server?.stop(0) + + } + + }