Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 32 additions & 28 deletions src/hydra-queue-runner/build-remote.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,23 @@

using namespace nix;

namespace nix::build_remote {

static Strings extraStoreArgs(std::string & machine)
bool ::Machine::isLocalhost() const
{
Strings result;
try {
auto parsed = parseURL(machine);
if (parsed.scheme != "ssh") {
throw SysError("Currently, only (legacy-)ssh stores are supported!");
}
machine = parsed.authority.value_or("");
auto remoteStore = parsed.query.find("remote-store");
if (remoteStore != parsed.query.end()) {
result = {"--store", shellEscape(remoteStore->second)};
}
} catch (BadURL &) {
// We just try to continue with `machine->sshName` here for backwards compat.
}

return result;
return storeUri.params.empty() && std::visit(overloaded {
[](const StoreReference::Auto &) {
return true;
},
[](const StoreReference::Specified & s) {
return
(s.scheme == "local" || s.scheme == "unix") ||
((s.scheme == "ssh" || s.scheme == "ssh-ng") &&
s.authority == "localhost");
},
}, storeUri.variant);
}

namespace nix::build_remote {

static std::unique_ptr<SSHMaster::Connection> openConnection(
::Machine::ptr machine, SSHMaster & master)
{
Expand All @@ -51,7 +46,11 @@ static std::unique_ptr<SSHMaster::Connection> openConnection(
command.push_back("--builders");
command.push_back("");
} else {
command.splice(command.end(), extraStoreArgs(machine->sshName));
auto remoteStore = machine->storeUri.params.find("remote-store");
if (remoteStore != machine->storeUri.params.end()) {
command.push_back("--store");
command.push_back(shellEscape(remoteStore->second));
}
}

return master.startCommand(std::move(command), {
Expand Down Expand Up @@ -187,7 +186,7 @@ static BasicDerivation sendInputs(
MaintainCount<counter> mc2(nrStepsCopyingTo);

printMsg(lvlDebug, "sending closure of ‘%s’ to ‘%s’",
localStore.printStorePath(step.drvPath), conn.machine->sshName);
localStore.printStorePath(step.drvPath), conn.machine->storeUri.render());

auto now1 = std::chrono::steady_clock::now();

Expand Down Expand Up @@ -393,8 +392,13 @@ void State::buildRemote(ref<Store> destStore,

updateStep(ssConnecting);

auto * pSpecified = std::get_if<StoreReference::Specified>(&machine->storeUri.variant);
if (!pSpecified || pSpecified->scheme != "ssh") {
throw Error("Currently, only (legacy-)ssh stores are supported!");
}

SSHMaster master {
machine->sshName,
pSpecified->authority,
machine->sshKey,
machine->sshPublicHostKey,
false, // no SSH master yet
Expand Down Expand Up @@ -445,11 +449,11 @@ void State::buildRemote(ref<Store> destStore,
conn.to,
conn.from,
our_version,
machine->sshName);
machine->storeUri.render());
} catch (EndOfFile & e) {
child->sshPid.wait();
std::string s = chomp(readFile(result.logFile));
throw Error("cannot connect to ‘%1%’: %2%", machine->sshName, s);
throw Error("cannot connect to ‘%1%’: %2%", machine->storeUri.render(), s);
}

{
Expand Down Expand Up @@ -480,7 +484,7 @@ void State::buildRemote(ref<Store> destStore,
/* Do the build. */
printMsg(lvlDebug, "building ‘%s’ on ‘%s’",
localStore->printStorePath(step->drvPath),
machine->sshName);
machine->storeUri.render());

updateStep(ssBuilding);

Expand All @@ -503,7 +507,7 @@ void State::buildRemote(ref<Store> destStore,
get a build log. */
if (result.isCached) {
printMsg(lvlInfo, "outputs of ‘%s’ substituted or already valid on ‘%s’",
localStore->printStorePath(step->drvPath), machine->sshName);
localStore->printStorePath(step->drvPath), machine->storeUri.render());
unlink(result.logFile.c_str());
result.logFile = "";
}
Expand Down Expand Up @@ -532,7 +536,7 @@ void State::buildRemote(ref<Store> destStore,

/* Copy each path. */
printMsg(lvlDebug, "copying outputs of ‘%s’ from ‘%s’ (%d bytes)",
localStore->printStorePath(step->drvPath), machine->sshName, totalNarSize);
localStore->printStorePath(step->drvPath), machine->storeUri.render(), totalNarSize);

build_remote::copyPathsFromRemote(conn, narMembers, *localStore, *destStore, infos);
auto now2 = std::chrono::steady_clock::now();
Expand Down Expand Up @@ -571,7 +575,7 @@ void State::buildRemote(ref<Store> destStore,
info->consecutiveFailures = std::min(info->consecutiveFailures + 1, (unsigned int) 4);
info->lastFailure = now;
int delta = retryInterval * std::pow(retryBackoff, info->consecutiveFailures - 1) + (rand() % 30);
printMsg(lvlInfo, "will disable machine ‘%1%’ for %2%s", machine->sshName, delta);
printMsg(lvlInfo, "will disable machine ‘%1%’ for %2%s", machine->storeUri.render(), delta);
info->disabledUntil = now + std::chrono::seconds(delta);
}
throw;
Expand Down
12 changes: 6 additions & 6 deletions src/hydra-queue-runner/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ void State::builder(MachineReservation::ptr reservation)
} catch (std::exception & e) {
printMsg(lvlError, "uncaught exception building ‘%s’ on ‘%s’: %s",
localStore->printStorePath(reservation->step->drvPath),
reservation->machine->sshName,
reservation->machine->storeUri.render(),
e.what());
}
}
Expand Down Expand Up @@ -150,7 +150,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
buildOptions.buildTimeout = build->buildTimeout;

printInfo("performing step ‘%s’ %d times on ‘%s’ (needed by build %d and %d others)",
localStore->printStorePath(step->drvPath), buildOptions.nrRepeats + 1, machine->sshName, buildId, (dependents.size() - 1));
localStore->printStorePath(step->drvPath), buildOptions.nrRepeats + 1, machine->storeUri.render(), buildId, (dependents.size() - 1));
}

if (!buildOneDone)
Expand Down Expand Up @@ -196,7 +196,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
{
auto mc = startDbUpdate();
pqxx::work txn(*conn);
stepNr = createBuildStep(txn, result.startTime, buildId, step, machine->sshName, bsBusy);
stepNr = createBuildStep(txn, result.startTime, buildId, step, machine->storeUri.render(), bsBusy);
txn.commit();
}

Expand Down Expand Up @@ -253,15 +253,15 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
/* Finish the step in the database. */
if (stepNr) {
pqxx::work txn(*conn);
finishBuildStep(txn, result, buildId, stepNr, machine->sshName);
finishBuildStep(txn, result, buildId, stepNr, machine->storeUri.render());
txn.commit();
}

/* The step had a hopefully temporary failure (e.g. network
issue). Retry a number of times. */
if (result.canRetry) {
printMsg(lvlError, "possibly transient failure building ‘%s’ on ‘%s’: %s",
localStore->printStorePath(step->drvPath), machine->sshName, result.errorMsg);
localStore->printStorePath(step->drvPath), machine->storeUri.render(), result.errorMsg);
assert(stepNr);
bool retry;
{
Expand Down Expand Up @@ -452,7 +452,7 @@ void State::failStep(
build->finishedInDB)
continue;
createBuildStep(txn,
0, build->id, step, machine ? machine->sshName : "",
0, build->id, step, machine ? machine->storeUri.render() : "",
result.stepStatus, result.errorMsg, buildId == build->id ? 0 : buildId);
}

Expand Down
2 changes: 1 addition & 1 deletion src/hydra-queue-runner/dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ system_time State::doDispatch()
/* Can this machine do this step? */
if (!mi.machine->supportsStep(step)) {
debug("machine '%s' does not support step '%s' (system type '%s')",
mi.machine->sshName, localStore->printStorePath(step->drvPath), step->drv->platform);
mi.machine->storeUri.render(), localStore->printStorePath(step->drvPath), step->drv->platform);
continue;
}

Expand Down
18 changes: 8 additions & 10 deletions src/hydra-queue-runner/hydra-queue-runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ void State::parseMachines(const std::string & contents)
using MaxJobs = std::remove_const<decltype(nix::Machine::maxJobs)>::type;

auto machine = std::make_shared<::Machine>(nix::Machine {
// `storeUri`, not yet used
"",
// `storeUri`
tokens[0],
// `systemTypes`
tokenizeString<StringSet>(tokens[1], ","),
// `sshKey`
Expand All @@ -175,25 +175,23 @@ void State::parseMachines(const std::string & contents)
: "",
});

machine->sshName = tokens[0];

/* Re-use the State object of the previous machine with the
same name. */
auto i = oldMachines.find(machine->sshName);
auto i = oldMachines.find(machine->storeUri.variant);
if (i == oldMachines.end())
printMsg(lvlChatty, "adding new machine ‘%1%’", machine->sshName);
printMsg(lvlChatty, "adding new machine ‘%1%’", machine->storeUri.render());
else
printMsg(lvlChatty, "updating machine ‘%1%’", machine->sshName);
printMsg(lvlChatty, "updating machine ‘%1%’", machine->storeUri.render());
machine->state = i == oldMachines.end()
? std::make_shared<::Machine::State>()
: i->second->state;
newMachines[machine->sshName] = machine;
newMachines[machine->storeUri.variant] = machine;
}

for (auto & m : oldMachines)
if (newMachines.find(m.first) == newMachines.end()) {
if (m.second->enabled)
printInfo("removing machine ‘%1%’", m.first);
printInfo("removing machine ‘%1%’", m.second->storeUri.render());
/* Add a disabled ::Machine object to make sure stats are
maintained. */
auto machine = std::make_shared<::Machine>(*(m.second));
Expand Down Expand Up @@ -657,7 +655,7 @@ void State::dumpStatus(Connection & conn)
machine["avgStepTime"] = (float) s->totalStepTime / s->nrStepsDone;
machine["avgStepBuildTime"] = (float) s->totalStepBuildTime / s->nrStepsDone;
}
statusJson["machines"][m->sshName] = machine;
statusJson["machines"][m->storeUri.render()] = machine;
}
}

Expand Down
13 changes: 2 additions & 11 deletions src/hydra-queue-runner/state.hh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include <map>
#include <memory>
#include <queue>
#include <regex>

#include <prometheus/counter.h>
#include <prometheus/gauge.h>
Expand Down Expand Up @@ -240,10 +239,6 @@ struct Machine : nix::Machine
{
typedef std::shared_ptr<Machine> ptr;

/* TODO Get rid of: `nix::Machine::storeUri` is normalized in a way
we are not yet used to, but once we are, we don't need this. */
std::string sshName;

struct State {
typedef std::shared_ptr<State> ptr;
counter currentJobs{0};
Expand Down Expand Up @@ -293,11 +288,7 @@ struct Machine : nix::Machine
return true;
}

bool isLocalhost()
{
std::regex r("^(ssh://|ssh-ng://)?localhost$");
return std::regex_search(sshName, r);
}
bool isLocalhost() const;

// A connection to a machine
struct Connection : nix::ServeProto::BasicClientConnection {
Expand Down Expand Up @@ -357,7 +348,7 @@ private:

/* The build machines. */
std::mutex machinesReadyLock;
typedef std::map<std::string, Machine::ptr> Machines;
typedef std::map<nix::StoreReference::Variant, Machine::ptr> Machines;
nix::Sync<Machines> machines; // FIXME: use atomic_shared_ptr

/* Various stats. */
Expand Down