Skip to content

Commit

Permalink
Merge pull request #824 from conveyal/spot-instance-startup
Browse files Browse the repository at this point in the history
Refine spot instance startup
  • Loading branch information
ansoncfit authored Sep 27, 2022
2 parents 84254c7 + aab0166 commit 7765f0b
Showing 1 changed file with 32 additions and 5 deletions.
37 changes: 32 additions & 5 deletions src/main/java/com/conveyal/analysis/components/broker/Broker.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public interface Config {
* Used when auto-starting spot instances. Set to a smaller value to increase the number of
* workers requested automatically
*/
public final int TARGET_TASKS_PER_WORKER = 800;
public final int TARGET_TASKS_PER_WORKER_TRANSIT = 800;
public final int TARGET_TASKS_PER_WORKER_NONTRANSIT = 4_000;

/**
* We want to request spot instances to "boost" regional analyses after a few regional task
Expand Down Expand Up @@ -243,7 +244,8 @@ public void createOnDemandWorkerInCategory(WorkerCategory category, WorkerTags w
/**
* Create on-demand/spot workers for a given job, after certain checks
* @param nOnDemand EC2 on-demand instances to request
* @param nSpot EC2 spot instances to request
* @param nSpot Target number of EC2 spot instances to request. The actual number requested may be lower if the
* total number of workers running is approaching the maximum specified in the Broker config.
*/
public void createWorkersInCategory (WorkerCategory category, WorkerTags workerTags, int nOnDemand, int nSpot) {

Expand All @@ -257,6 +259,13 @@ public void createWorkersInCategory (WorkerCategory category, WorkerTags workerT
return;
}

// Backoff: reduce the nSpot requested when the number of already running workers starts approaching the
// configured maximum
if (workerCatalog.totalWorkerCount() * 2 > config.maxWorkers()) {
nSpot = Math.min(nSpot, (config.maxWorkers() - workerCatalog.totalWorkerCount()) / 2);
LOG.info("Worker pool over half of maximum size. Number of new spot instances set to {}", nSpot);
}

if (workerCatalog.totalWorkerCount() + nOnDemand + nSpot >= config.maxWorkers()) {
String message = String.format(
"Maximum of %d workers already started, not starting more;" +
Expand Down Expand Up @@ -483,9 +492,27 @@ private void requestExtraWorkersIfAppropriate(Job job) {
WorkerCategory workerCategory = job.workerCategory;
int categoryWorkersAlreadyRunning = workerCatalog.countWorkersInCategory(workerCategory);
if (categoryWorkersAlreadyRunning < MAX_WORKERS_PER_CATEGORY) {
// Start a number of workers that scales with the number of total tasks, up to a fixed number.
// TODO more refined determination of number of workers to start (e.g. using tasks per minute)
int targetWorkerTotal = Math.min(MAX_WORKERS_PER_CATEGORY, job.nTasksTotal / TARGET_TASKS_PER_WORKER);
// TODO more refined determination of number of workers to start (e.g. using observed tasks per minute
// for recently completed tasks -- but what about when initial origins are in a desert/ocean?)
int targetWorkerTotal;
if (job.templateTask.hasTransit()) {
// Total computation for a task with transit depends on the number of stops and whether the
// network has frequency-based routes. The total computation for the job depends on these
// factors as well as the number of tasks (origins). Zoom levels add a complication: the number of
// origins becomes an even poorer proxy for the number of stops. We use a scale factor to compensate
// -- all else equal, high zoom levels imply fewer stops per origin (task) and a lower ideal target
// for number of workers. TODO reduce scale factor further when there are no frequency routes. But is
// this worth adding a field to Job or RegionalTask?
float transitScaleFactor = (9f / job.templateTask.zoom);
targetWorkerTotal = (int) ((job.nTasksTotal / TARGET_TASKS_PER_WORKER_TRANSIT) * transitScaleFactor);
} else {
// Tasks without transit are simpler. They complete relatively quickly, and the total computation for
// the job increases roughly with linearly with the number of origins.
targetWorkerTotal = job.nTasksTotal / TARGET_TASKS_PER_WORKER_NONTRANSIT;
}

// Do not exceed the limit on workers per category TODO add similar limit per accessGroup or user
targetWorkerTotal = Math.min(targetWorkerTotal, MAX_WORKERS_PER_CATEGORY);
// Guardrail until freeform pointsets are tested more thoroughly
if (job.templateTask.originPointSet != null) targetWorkerTotal = Math.min(targetWorkerTotal, 5);
int nSpot = targetWorkerTotal - categoryWorkersAlreadyRunning;
Expand Down

0 comments on commit 7765f0b

Please sign in to comment.