diff --git a/src/main/java/com/conveyal/analysis/components/broker/Broker.java b/src/main/java/com/conveyal/analysis/components/broker/Broker.java index f89f763b0..ee1193b3d 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/Broker.java +++ b/src/main/java/com/conveyal/analysis/components/broker/Broker.java @@ -108,7 +108,8 @@ public interface Config { * Used when auto-starting spot instances. Set to a smaller value to increase the number of * workers requested automatically */ - public final int TARGET_TASKS_PER_WORKER = 800; + public final int TARGET_TASKS_PER_WORKER_TRANSIT = 800; + public final int TARGET_TASKS_PER_WORKER_NONTRANSIT = 4_000; /** * We want to request spot instances to "boost" regional analyses after a few regional task @@ -243,7 +244,8 @@ public void createOnDemandWorkerInCategory(WorkerCategory category, WorkerTags w /** * Create on-demand/spot workers for a given job, after certain checks * @param nOnDemand EC2 on-demand instances to request - * @param nSpot EC2 spot instances to request + * @param nSpot Target number of EC2 spot instances to request. The actual number requested may be lower if the + * total number of workers running is approaching the maximum specified in the Broker config. */ public void createWorkersInCategory (WorkerCategory category, WorkerTags workerTags, int nOnDemand, int nSpot) { @@ -257,6 +259,13 @@ public void createWorkersInCategory (WorkerCategory category, WorkerTags workerT return; } + // Backoff: reduce the nSpot requested when the number of already running workers starts approaching the + // configured maximum + if (workerCatalog.totalWorkerCount() * 2 > config.maxWorkers()) { + nSpot = Math.min(nSpot, (config.maxWorkers() - workerCatalog.totalWorkerCount()) / 2); + LOG.info("Worker pool over half of maximum size. Number of new spot instances set to {}", nSpot); + } + if (workerCatalog.totalWorkerCount() + nOnDemand + nSpot >= config.maxWorkers()) { String message = String.format( "Maximum of %d workers already started, not starting more;" + @@ -483,9 +492,27 @@ private void requestExtraWorkersIfAppropriate(Job job) { WorkerCategory workerCategory = job.workerCategory; int categoryWorkersAlreadyRunning = workerCatalog.countWorkersInCategory(workerCategory); if (categoryWorkersAlreadyRunning < MAX_WORKERS_PER_CATEGORY) { - // Start a number of workers that scales with the number of total tasks, up to a fixed number. - // TODO more refined determination of number of workers to start (e.g. using tasks per minute) - int targetWorkerTotal = Math.min(MAX_WORKERS_PER_CATEGORY, job.nTasksTotal / TARGET_TASKS_PER_WORKER); + // TODO more refined determination of number of workers to start (e.g. using observed tasks per minute + // for recently completed tasks -- but what about when initial origins are in a desert/ocean?) + int targetWorkerTotal; + if (job.templateTask.hasTransit()) { + // Total computation for a task with transit depends on the number of stops and whether the + // network has frequency-based routes. The total computation for the job depends on these + // factors as well as the number of tasks (origins). Zoom levels add a complication: the number of + // origins becomes an even poorer proxy for the number of stops. We use a scale factor to compensate + // -- all else equal, high zoom levels imply fewer stops per origin (task) and a lower ideal target + // for number of workers. TODO reduce scale factor further when there are no frequency routes. But is + // this worth adding a field to Job or RegionalTask? + float transitScaleFactor = (9f / job.templateTask.zoom); + targetWorkerTotal = (int) ((job.nTasksTotal / TARGET_TASKS_PER_WORKER_TRANSIT) * transitScaleFactor); + } else { + // Tasks without transit are simpler. They complete relatively quickly, and the total computation for + // the job increases roughly with linearly with the number of origins. + targetWorkerTotal = job.nTasksTotal / TARGET_TASKS_PER_WORKER_NONTRANSIT; + } + + // Do not exceed the limit on workers per category TODO add similar limit per accessGroup or user + targetWorkerTotal = Math.min(targetWorkerTotal, MAX_WORKERS_PER_CATEGORY); // Guardrail until freeform pointsets are tested more thoroughly if (job.templateTask.originPointSet != null) targetWorkerTotal = Math.min(targetWorkerTotal, 5); int nSpot = targetWorkerTotal - categoryWorkersAlreadyRunning;