Skip to content

Commit

Permalink
feat: add job retry on agent connection error
Browse files Browse the repository at this point in the history
  • Loading branch information
ineedfat committed Oct 28, 2019
1 parent af061af commit be1d6c6
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 7 deletions.
26 changes: 24 additions & 2 deletions packages/@best/agent-hub/src/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import BenchmarkJob from "./BenchmarkJob";
import { EventEmitter} from "events";
import socketIO from "socket.io-client";
import http from 'http';
import https from 'https';
// @todo: use this indirectly... make an abstraction for the runner in agent
import SocketIOFile from "@best/runner-remote/build/file-uploader";
import { BenchmarkResultsSnapshot, BenchmarkResultsState, BenchmarkRuntimeConfig } from "@best/types";
Expand Down Expand Up @@ -77,6 +79,11 @@ export class Agent extends EventEmitter {
}
this.status = AgentStatus.RunningJob;

if (!await this.isAlive()) {
this.status = AgentStatus.Offline;
throw new Error(AGENT_CONNECTION_ERROR);
}

// load the tar file...
try {
await loadBenchmarkJob(job);
Expand All @@ -85,7 +92,7 @@ export class Agent extends EventEmitter {
this._logger.event(job.socketConnection.id, 'benchmark error', err, false);
job.socketConnection.emit('benchmark_error', err);
this.status = AgentStatus.Idle;
return ;
return;
}

// eventually this can become a runner...
Expand All @@ -98,7 +105,7 @@ export class Agent extends EventEmitter {
// @todo: move to failures queue
if (err.message === AGENT_CONNECTION_ERROR) {
this.status = AgentStatus.Offline;
// TODO: in this case, we need to re-run the job on a different agent (if we have one)
throw err;
} else {
this.status = AgentStatus.Idle;
}
Expand All @@ -118,6 +125,21 @@ export class Agent extends EventEmitter {
return this.status === AgentStatus.Idle;
}

async isAlive() {
return new Promise((resolve) => {
const request = this._config.host.toLowerCase().startsWith('https') ? https : http;
request.get(this._config.host, res => {
if (res.statusCode === 200) {
resolve(true);
} else {
resolve(false);
}
}).on('error', error => {
resolve(false);
}).end();
});
}

private async proxifyJob(job: BenchmarkJob) {
const self = this;
const remoteAgentRunnerConfig = Object.assign(
Expand Down
4 changes: 1 addition & 3 deletions packages/@best/agent-hub/src/AgentManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ export class AgentManager extends EventEmitter {

getIdleAgentForJob(job: BenchmarkJob): Agent | null {
// @todo: organize Agents by category.
const idleAgentsForJob = this.agents.filter(agent => agent.isIdle() && agent.canRunJob(job.spec));

return idleAgentsForJob.length ? idleAgentsForJob[0] : null;
return this.agents.find(agent => agent.isIdle() && agent.canRunJob(job.spec)) || null;
}

existAgentWithSpec(spec: Spec): boolean {
Expand Down
8 changes: 6 additions & 2 deletions packages/@best/agent-hub/src/HubApplication.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ export class HubApplication {

if (agent !== null) {
this._incomingQueue.remove(job);
agent.runJob(job);
agent.runJob(job).catch(err => {
this._incomingQueue.push(job);
});
this._logger.event("Hub", 'PENDING_JOB_CHANGED');
} else {
job.socketConnection.emit('benchmark_enqueued', { pending: this._incomingQueue.size });
Expand Down Expand Up @@ -219,7 +221,9 @@ export class HubApplication {
if (agent.canRunJob(job!.spec)) {
this._incomingQueue.remove(job!);
this._logger.event("Hub", 'PENDING_JOB_CHANGED');
agent.runJob(job!);
agent.runJob(job!).catch( (err) => {
this._incomingQueue.push(job!);
});
break;
}
}
Expand Down

0 comments on commit be1d6c6

Please sign in to comment.