Skip to content

Commit

Permalink
Merge pull request #33058 from vespa-engine/hmusum/add-rpc-method-for…
Browse files Browse the repository at this point in the history
…-triggering-download

Add rpc method for triggering download of file references
  • Loading branch information
hakonhall authored Jan 2, 2025
2 parents ec8fe39 + e4add27 commit 7a05b8b
Showing 1 changed file with 43 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.yahoo.vespa.filedistribution.FileReceiver;
import com.yahoo.vespa.filedistribution.FileReferenceData;
import com.yahoo.vespa.filedistribution.FileReferenceDownload;
import com.yahoo.yolean.Exceptions;

import java.nio.ByteBuffer;
import java.time.Duration;
Expand All @@ -66,6 +67,7 @@

import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.FINEST;
import static java.util.logging.Level.WARNING;

/**
Expand Down Expand Up @@ -225,6 +227,12 @@ private void setUpFileDistributionHandlers() {
.methodDesc("set which file references to download")
.paramDesc(0, "file references", "file reference to download")
.returnDesc(0, "ret", "0 if success, 1 otherwise"));
getSupervisor().addMethod(new Method("filedistribution.triggerDownload", "Ss", "i", this::triggerDownload)
.requireCapabilities(Capability.CONFIGSERVER__FILEDISTRIBUTION_API)
.methodDesc("trigger download of file references from supplied source")
.paramDesc(0, "file references", "file reference to download")
.paramDesc(1, "address", "address (jrt spec) for a source to download from")
.returnDesc(0, "ret", "0 if success, 1 otherwise"));
}

/**
Expand Down Expand Up @@ -280,7 +288,7 @@ void configActivated(ApplicationId applicationId) {
// Doing cancel here deals with the case where the timer is already running or has not run, so
// there is no need for any extra check.
if (delayedConfigResponse.cancel()) {
log.log(FINE, () -> logPre + "Timer cancelled for " + delayedConfigResponse.request);
log.log(FINEST, () -> logPre + "Timer cancelled for " + delayedConfigResponse.request);
// Do not wait for this request if we were unable to execute
if (addToRequestQueue(delayedConfigResponse.request, false, completionService)) {
responsesSent++;
Expand All @@ -307,7 +315,7 @@ public void applicationRemoved(ApplicationId applicationId) {
}

public void respond(JRTServerConfigRequest request) {
log.log(FINE, () -> "Trace when responding:\n" + request.getRequestTrace().toString());
log.log(FINEST, () -> "Trace when responding to " + request + ":\n" + request.getRequestTrace().toString());
request.getRequest().returnRequest();
}

Expand Down Expand Up @@ -573,17 +581,43 @@ private void setFileReferencesToDownload(Request req) {
var peerSpec = client.peerSpec();
Stream.of(fileReferenceStrings)
.map(FileReference::new)
.forEach(fileReference -> downloadFromSource(fileReference, client, peerSpec));
.forEach(fileReference -> downloadFromSource(fileReference, client.toString(), peerSpec));
req.returnValues().add(new Int32Value(0));
});
}

private void downloadFromSource(FileReference fileReference, Target client, Spec peerSpec) {
var fileReferenceDownload = new FileReferenceDownload(fileReference, client.toString());
var downloading = downloader.downloadFromSource(fileReferenceDownload, peerSpec);
log.log(FINE, () -> downloading
? "Downloading file reference " + fileReference.value() + " from " + peerSpec
: "File reference " + fileReference.value() + " already exists");
private void triggerDownload(Request req) {
log.log(FINE, () -> "Got request triggerDownload from " + req.target());
req.detach();
rpcAuthorizer.authorizeFileRequest(req)
.thenRun(() -> { // okay to do in authorizer thread as downloadFromSource is async
if (req.parameters().size() < 2) {
log.log(FINE, () -> "No source to download from in triggerDownload()");
return;
}
// Download directly from the source that has the file reference, which
// is the client that sent the request
List<String> fileReferenceStrings = List.of(req.parameters().get(0).asStringArray());
var peerSpec = new Spec(req.parameters().get(1).asString());
log.log(FINE, () -> "setFileReferencesToDownload: " + fileReferenceStrings + ", spec=" + peerSpec);
fileReferenceStrings.stream()
.map(FileReference::new)
.forEach(fileReference -> downloadFromSource(fileReference, peerSpec.host(), peerSpec));
});
req.returnValues().add(new Int32Value(0));
req.returnRequest();
}

private void downloadFromSource(FileReference fileReference, String client, Spec peerSpec) {
var fileReferenceDownload = new FileReferenceDownload(fileReference, client, false);
try {
var downloading = downloader.downloadFromSource(fileReferenceDownload, peerSpec);
log.log(FINE, () -> downloading
? "Downloading file reference " + fileReference.value() + " from " + peerSpec
: "File reference " + fileReference.value() + " already exists");
} catch (Exception e) {
log.log(WARNING, "download from source failed: ", Exceptions.toMessageString(e));
}
}

}

0 comments on commit 7a05b8b

Please sign in to comment.