Skip to content

Commit

Permalink
Improve Wave error handling
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
pditommaso committed Oct 7, 2023
1 parent 3d6b735 commit d47e8b0
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 18 deletions.
40 changes: 22 additions & 18 deletions plugins/nf-wave/src/main/io/seqera/wave/plugin/WaveClient.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,9 @@ class WaveClient {

final resp = httpSend(req)
final code = resp.statusCode()
final body = resp.body()
if( code>=200 && code<400 ) {
log.debug "Wave container config response: [$code] ${resp.body()}"
log.debug "Wave container config response: [$code] ${body}"
return jsonToContainerConfig(resp.body())
}
throw new BadResponseException("Unexpected response for containerContainerConfigUrl \'$configUrl\': [${resp.statusCode()}] ${resp.body()}")
Expand Down Expand Up @@ -551,12 +552,13 @@ class WaveClient {
.build()
final begin = System.currentTimeMillis()
final resp = httpSend(req)
final body = resp.body()
final code = resp.statusCode()
if( code>=200 && code<400 ) {
log.debug "Wave container available in ${nextflow.util.Duration.of(System.currentTimeMillis()-begin)}: [$code] ${resp.body()}"
log.debug "Wave container available in ${nextflow.util.Duration.of(System.currentTimeMillis()-begin)}: [$code] ${body}"
}
else
throw new BadResponseException("Unexpected response for \'$manifest\': [${resp.statusCode()}] ${resp.body()}")
throw new BadResponseException("Unexpected response for \'$manifest\': [${code}] ${body}")
}

static protected boolean isCondaLocalFile(String value) {
Expand Down Expand Up @@ -587,7 +589,9 @@ class WaveClient {
.build()

final resp = httpSend(req)
log.debug "Refresh cookie response: [${resp.statusCode()}] ${resp.body()}"
final code = resp.statusCode()
final body = resp.body()
log.debug "Refresh cookie response: [${code}] ${body}"
if( resp.statusCode() != 200 )
return false

Expand Down Expand Up @@ -623,39 +627,39 @@ class WaveClient {
return null
}

protected <T> RetryPolicy<T> retryPolicy(Predicate<? extends Throwable> cond) {
protected <T> RetryPolicy<T> retryPolicy(Predicate<? extends Throwable> cond, Predicate<T> handle) {
final cfg = config.retryOpts()
final listener = new EventListener<ExecutionAttemptedEvent<T>>() {
@Override
void accept(ExecutionAttemptedEvent<T> event) throws Throwable {
log.debug("Wave connection failure - attempt: ${event.attemptCount}", event.lastFailure)
def msg = "Wave connection failure - attempt: ${event.attemptCount}"
if( event.lastResult!=null )
msg += "; response: ${event.lastResult}"
if( event.lastFailure != null )
msg += "; exception: [${event.lastFailure.class.name}] ${event.lastFailure.message}"
log.debug(msg)
}
}
return RetryPolicy.<T>builder()
.handleIf(cond)
.handleResultIf(handle)
.withBackoff(cfg.delay.toMillis(), cfg.maxDelay.toMillis(), ChronoUnit.MILLIS)
.withMaxAttempts(cfg.maxAttempts)
.withJitter(cfg.jitter)
.onRetry(listener)
.build()
}

protected <T> T safeApply(CheckedSupplier<T> action) {
final cond = (e -> e instanceof IOException) as Predicate<? extends Throwable>
final policy = retryPolicy(cond)
protected <T> HttpResponse<T> safeApply(CheckedSupplier action) {
final retryOnException = (e -> e instanceof IOException) as Predicate<? extends Throwable>
final retryOnStatusCode = ((HttpResponse<T> resp) -> resp.statusCode() in SERVER_ERRORS) as Predicate<HttpResponse<T>>
final policy = retryPolicy(retryOnException, retryOnStatusCode)
return Failsafe.with(policy).get(action)
}

static private final List<Integer> SERVER_ERRORS = [429,502,503,504]
static private final List<Integer> SERVER_ERRORS = [429,500,502,503,504]

protected HttpResponse<String> httpSend(HttpRequest req) {
return safeApply(() -> {
final resp=httpClient.send(req, HttpResponse.BodyHandlers.ofString())
if( resp.statusCode() in SERVER_ERRORS) {
// throws an IOException so that the condition is handled by the retry policy
throw new IOException("Unexpected server response code ${resp.statusCode()} - message: ${resp.body()}")
}
return resp
})
return safeApply(() -> httpClient.send(req, HttpResponse.BodyHandlers.ofString()))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package io.seqera.wave.plugin

import static java.nio.file.StandardOpenOption.*

import java.net.http.HttpRequest
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.attribute.FileTime
Expand Down Expand Up @@ -1219,4 +1220,45 @@ class WaveClientTest extends Specification {
'https://foo.com' | true
}

def 'should retry http request' () {

given:
int requestCount=0
HttpHandler handler = { HttpExchange exchange ->
if( ++requestCount<3 ) {
exchange.getResponseHeaders().add("Content-Type", "text/plain")
exchange.sendResponseHeaders(503, 0)
exchange.getResponseBody().close()
}
else {
def body = 'Hello world!'
exchange.getResponseHeaders().add("Content-Type", "text/plain")
exchange.sendResponseHeaders(200, body.size())
exchange.getResponseBody().write(body.bytes)
exchange.getResponseBody().close()
}
}

HttpServer server = HttpServer.create(new InetSocketAddress(9901), 0);
server.createContext("/", handler);
server.start()

def session = Mock(Session) {getConfig() >> [:] }
def client = new WaveClient(session)

when:
def request = HttpRequest.newBuilder().uri(new URI('http://localhost:9901/foo.txt')).build()
def response = client.httpSend(request)
then:
response.statusCode() == 200
response.body() == 'Hello world!'
and:
requestCount == 3

cleanup:
server?.stop(0)

}


}

0 comments on commit d47e8b0

Please sign in to comment.