diff --git a/src/hydra-queue-runner/dispatcher.cc b/src/hydra-queue-runner/dispatcher.cc index 531c6b46b..eda6db0f1 100644 --- a/src/hydra-queue-runner/dispatcher.cc +++ b/src/hydra-queue-runner/dispatcher.cc @@ -340,6 +340,9 @@ State::MachineReservation::MachineReservation(State & state, Step::ptr step, Mac { machine->state->currentJobs++; + for (auto & f : step->requiredSystemFeatures) + machine->state->featuresConsumption[f.first] += f.second; + { auto machineTypes_(state.machineTypes.lock()); (*machineTypes_)[step->systemType].running++; @@ -354,6 +357,9 @@ State::MachineReservation::~MachineReservation() if (prev == 1) machine->state->idleSince = time(0); + for (auto & f : step->requiredSystemFeatures) + machine->state->featuresConsumption[f.first] -= f.second; + { auto machineTypes_(state.machineTypes.lock()); auto & machineType = (*machineTypes_)[step->systemType]; diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 70329a9e5..9f0fee265 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -109,12 +110,30 @@ void State::parseMachines(const std::string & contents) else machine->maxJobs = 1; machine->speedFactor = atof(tokens[4].c_str()); + if (tokens[5] == "-") tokens[5] = ""; - machine->supportedFeatures = tokenizeString(tokens[5], ","); + for (auto & rf : tokenizeString(tokens[5], ",")) { + auto ft = tokenizeString>(rf, ":"); + unsigned int amount = UINT_MAX; + if (ft.size() == 2) // consumable info provided + amount = std::stoul(ft.back()); + if (amount == 0) + amount = UINT_MAX; + machine->supportedFeatures[ft.front()] = amount; + } + if (tokens[6] == "-") tokens[6] = ""; - machine->mandatoryFeatures = tokenizeString(tokens[6], ","); - for (auto & f : machine->mandatoryFeatures) - machine->supportedFeatures.insert(f); + for (auto & rf : tokenizeString(tokens[6], ",")) { + auto ft = tokenizeString>(rf, ":"); + unsigned int amount = UINT_MAX; + if (ft.size() == 2) // consumable info provided + amount = std::stoul(ft.back()); + if (amount == 0) + amount = UINT_MAX; + machine->mandatoryFeatures[ft.front()] = amount; + machine->supportedFeatures[ft.front()] = amount; + } + if (tokens[7] != "" && tokens[7] != "-") machine->sshPublicHostKey = base64Decode(tokens[7]); diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc index 862925b2c..81419fba4 100644 --- a/src/hydra-queue-runner/queue-monitor.cc +++ b/src/hydra-queue-runner/queue-monitor.cc @@ -435,14 +435,24 @@ Step::ptr State::createStep(ref destStore, step->systemType = step->drv.platform; { auto i = step->drv.env.find("requiredSystemFeatures"); - StringSet features; - if (i != step->drv.env.end()) - features = step->requiredSystemFeatures = tokenizeString>(i->second); + StringSet rawFeatures; + std::map features; + if (i != step->drv.env.end()) { + rawFeatures = tokenizeString>(i->second); + for (auto & rf : rawFeatures) { + auto ft = tokenizeString>(rf, ":"); + unsigned int amount = 1; + if (ft.size() == 2) // consumable info provided + amount = std::stoul(ft.back()); + features[ft.front()] = amount; + } + step->requiredSystemFeatures = features; + } if (step->preferLocalBuild) - features.insert("local"); + features["local"] = 1; if (!features.empty()) { step->systemType += ":"; - step->systemType += concatStringsSep(",", features); + step->systemType += concatStringsSep(",", rawFeatures); } } diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 0e91ab565..180139548 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -152,7 +152,7 @@ struct Step nix::Path drvPath; nix::Derivation drv; - std::set requiredSystemFeatures; + std::map requiredSystemFeatures; bool preferLocalBuild; bool isDeterministic; std::string systemType; // concatenation of drv.platform and requiredSystemFeatures @@ -220,7 +220,8 @@ struct Machine bool enabled{true}; std::string sshName, sshKey; - std::set systemTypes, supportedFeatures, mandatoryFeatures; + std::set systemTypes; + std::map supportedFeatures, mandatoryFeatures; unsigned int maxJobs = 1; float speedFactor = 1.0; std::string sshPublicHostKey; @@ -228,6 +229,7 @@ struct Machine struct State { typedef std::shared_ptr ptr; counter currentJobs{0}; + std::map featuresConsumption; counter nrStepsDone{0}; counter totalStepTime{0}; // total time for steps, including closure copying counter totalStepBuildTime{0}; // total build time for steps @@ -254,7 +256,7 @@ struct Machine if (!systemTypes.count(step->drv.platform == "builtin" ? nix::settings.thisSystem : step->drv.platform)) return false; - /* Check that the step requires all mandatory features of this + /* Check that the step requires/consumes all mandatory features of this machine. (Thus, a machine with the mandatory "benchmark" feature will *only* execute steps that require "benchmark".) The "preferLocalBuild" bit of a step is @@ -262,14 +264,19 @@ struct Machine "local" as a mandatory feature will only do preferLocalBuild steps. */ for (auto & f : mandatoryFeatures) - if (!step->requiredSystemFeatures.count(f) - && !(f == "local" && step->preferLocalBuild)) + if (step->requiredSystemFeatures.find(f.first) == step->requiredSystemFeatures.end() + && !(f.first == "local" && step->preferLocalBuild)) return false; /* Check that the machine supports all features required by - the step. */ - for (auto & f : step->requiredSystemFeatures) - if (!supportedFeatures.count(f)) return false; + the step, and has enough of each available. */ + for (auto & f : step->requiredSystemFeatures) { + if (supportedFeatures.find(f.first) == supportedFeatures.end()) + return false; + unsigned long current = state->featuresConsumption[f.first]; + if (current + f.second > supportedFeatures[f.first]) + return false; + } return true; }