Skip to content

Commit

Permalink
Merge pull request #20297 from vespa-engine/hmusum/create-FileDownloa…
Browse files Browse the repository at this point in the history
…der-once-for-maintainer

Create FileDownloader once [run-systemtest]
  • Loading branch information
Harald Musum authored Nov 30, 2021
2 parents c105ea3 + 15e5edc commit 318edd1
Showing 1 changed file with 47 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.yahoo.config.subscription.ConfigSourceSet;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Transport;
import com.yahoo.vespa.config.ConnectionPool;
import com.yahoo.vespa.config.JRTConnectionPool;
import com.yahoo.vespa.config.server.ApplicationRepository;
import com.yahoo.vespa.config.server.session.Session;
Expand All @@ -22,6 +23,7 @@

import java.io.File;
import java.time.Duration;
import java.util.List;
import java.util.logging.Logger;

import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.fileReferenceExistsOnDisk;
Expand All @@ -41,9 +43,8 @@ public class ApplicationPackageMaintainer extends ConfigServerMaintainer {
private final ApplicationRepository applicationRepository;
private final File downloadDirectory;
private final ConfigserverConfig configserverConfig;
private final Supervisor supervisor;
private final boolean useFileDistributionConnectionPool;

private final Supervisor supervisor = new Supervisor(new Transport("filedistribution-pool")).setDropEmptyBuffers(true);
private final FileDownloader fileDownloader;

ApplicationPackageMaintainer(ApplicationRepository applicationRepository,
Curator curator,
Expand All @@ -52,9 +53,12 @@ public class ApplicationPackageMaintainer extends ConfigServerMaintainer {
super(applicationRepository, curator, flagSource, applicationRepository.clock().instant(), interval, false);
this.applicationRepository = applicationRepository;
this.configserverConfig = applicationRepository.configserverConfig();
this.supervisor = new Supervisor(new Transport("filedistribution-pool")).setDropEmptyBuffers(true);
this.downloadDirectory = new File(Defaults.getDefaults().underVespaHome(configserverConfig.fileReferencesDir()));
this.useFileDistributionConnectionPool = Flags.USE_FILE_DISTRIBUTION_CONNECTION_POOL.bindTo(flagSource).value();
boolean useFileDistributionConnectionPool = Flags.USE_FILE_DISTRIBUTION_CONNECTION_POOL.bindTo(flagSource).value();
this.fileDownloader = createFileDownloader(configserverConfig,
useFileDistributionConnectionPool,
downloadDirectory,
supervisor);
}

@Override
Expand All @@ -64,49 +68,56 @@ protected double maintain() {
int attempts = 0;
int failures = 0;

try (var fileDownloader = createFileDownloader()) {
for (var applicationId : applicationRepository.listApplications()) {
log.finest(() -> "Verifying application package for " + applicationId);
Session session = applicationRepository.getActiveSession(applicationId);
if (session == null) continue; // App might be deleted after call to listApplications() or not activated yet (bootstrap phase)

FileReference appFileReference = session.getApplicationPackageReference();
if (appFileReference != null) {
long sessionId = session.getSessionId();
attempts++;
if (! fileReferenceExistsOnDisk(downloadDirectory, appFileReference)) {
log.fine(() -> "Downloading application package for " + applicationId + " (session " + sessionId + ")");

FileReferenceDownload download = new FileReferenceDownload(appFileReference,
false,
this.getClass().getSimpleName());
if (fileDownloader.getFile(download).isEmpty()) {
failures++;
log.info("Failed downloading application package (" + appFileReference + ")" +
" for " + applicationId + " (session " + sessionId + ")");
continue;
}
for (var applicationId : applicationRepository.listApplications()) {
log.finest(() -> "Verifying application package for " + applicationId);
Session session = applicationRepository.getActiveSession(applicationId);
if (session == null)
continue; // App might be deleted after call to listApplications() or not activated yet (bootstrap phase)

FileReference appFileReference = session.getApplicationPackageReference();
if (appFileReference != null) {
long sessionId = session.getSessionId();
attempts++;
if (!fileReferenceExistsOnDisk(downloadDirectory, appFileReference)) {
log.fine(() -> "Downloading application package for " + applicationId + " (session " + sessionId + ")");

FileReferenceDownload download = new FileReferenceDownload(appFileReference,
false,
this.getClass().getSimpleName());
if (fileDownloader.getFile(download).isEmpty()) {
failures++;
log.info("Failed downloading application package (" + appFileReference + ")" +
" for " + applicationId + " (session " + sessionId + ")");
continue;
}
createLocalSessionIfMissing(applicationId, sessionId);
}
createLocalSessionIfMissing(applicationId, sessionId);
}
}
return asSuccessFactor(attempts, failures);
}

private FileDownloader createFileDownloader() {
ConfigSourceSet configSourceSet = new ConfigSourceSet(getOtherConfigServersInCluster(configserverConfig));
return new FileDownloader(useFileDistributionConnectionPool
? new FileDistributionConnectionPool(configSourceSet, supervisor)
: new JRTConnectionPool(configSourceSet, supervisor),
supervisor,
downloadDirectory,
Duration.ofSeconds(30));
private static FileDownloader createFileDownloader(ConfigserverConfig configserverConfig,
boolean useFileDistributionConnectionPool,
File downloadDirectory,
Supervisor supervisor) {
List<String> otherConfigServersInCluster = getOtherConfigServersInCluster(configserverConfig);
ConfigSourceSet configSourceSet = new ConfigSourceSet(otherConfigServersInCluster);

ConnectionPool connectionPool;
if (otherConfigServersInCluster.isEmpty())
connectionPool = FileDownloader.emptyConnectionPool();
else
connectionPool = useFileDistributionConnectionPool
? new FileDistributionConnectionPool(configSourceSet, supervisor)
: new JRTConnectionPool(configSourceSet, supervisor);
return new FileDownloader(connectionPool, supervisor, downloadDirectory, Duration.ofSeconds(30));
}

@Override
public void awaitShutdown() {
supervisor.transport().shutdown().join();
fileDownloader.close();
super.awaitShutdown();
}

Expand Down

0 comments on commit 318edd1

Please sign in to comment.