diff --git a/packages/@best/agent-hub/src/Agent.ts b/packages/@best/agent-hub/src/Agent.ts index 44f85dc3..f86bf70e 100644 --- a/packages/@best/agent-hub/src/Agent.ts +++ b/packages/@best/agent-hub/src/Agent.ts @@ -8,6 +8,8 @@ import BenchmarkJob from "./BenchmarkJob"; import { EventEmitter} from "events"; import socketIO from "socket.io-client"; +import http from 'http'; +import https from 'https'; // @todo: use this indirectly... make an abstraction for the runner in agent import SocketIOFile from "@best/runner-remote/build/file-uploader"; import { BenchmarkResultsSnapshot, BenchmarkResultsState, BenchmarkRuntimeConfig } from "@best/types"; @@ -77,6 +79,11 @@ export class Agent extends EventEmitter { } this.status = AgentStatus.RunningJob; + if (!await this.isAlive()) { + this.status = AgentStatus.Offline; + throw new Error(AGENT_CONNECTION_ERROR); + } + // load the tar file... try { await loadBenchmarkJob(job); @@ -85,7 +92,7 @@ export class Agent extends EventEmitter { this._logger.event(job.socketConnection.id, 'benchmark error', err, false); job.socketConnection.emit('benchmark_error', err); this.status = AgentStatus.Idle; - return ; + return; } // eventually this can become a runner... @@ -98,7 +105,7 @@ export class Agent extends EventEmitter { // @todo: move to failures queue if (err.message === AGENT_CONNECTION_ERROR) { this.status = AgentStatus.Offline; - // TODO: in this case, we need to re-run the job on a different agent (if we have one) + throw err; } else { this.status = AgentStatus.Idle; } @@ -118,6 +125,21 @@ export class Agent extends EventEmitter { return this.status === AgentStatus.Idle; } + async isAlive() { + return new Promise((resolve) => { + const request = this._config.host.toLowerCase().startsWith('https') ? https : http; + request.get(this._config.host, res => { + if (res.statusCode === 200) { + resolve(true); + } else { + resolve(false); + } + }).on('error', error => { + resolve(false); + }).end(); + }); + } + private async proxifyJob(job: BenchmarkJob) { const self = this; const remoteAgentRunnerConfig = Object.assign( diff --git a/packages/@best/agent-hub/src/AgentManager.ts b/packages/@best/agent-hub/src/AgentManager.ts index 594dc828..5171ccbc 100644 --- a/packages/@best/agent-hub/src/AgentManager.ts +++ b/packages/@best/agent-hub/src/AgentManager.ts @@ -32,9 +32,7 @@ export class AgentManager extends EventEmitter { getIdleAgentForJob(job: BenchmarkJob): Agent | null { // @todo: organize Agents by category. - const idleAgentsForJob = this.agents.filter(agent => agent.isIdle() && agent.canRunJob(job.spec)); - - return idleAgentsForJob.length ? idleAgentsForJob[0] : null; + return this.agents.find(agent => agent.isIdle() && agent.canRunJob(job.spec)) || null; } existAgentWithSpec(spec: Spec): boolean { diff --git a/packages/@best/agent-hub/src/HubApplication.ts b/packages/@best/agent-hub/src/HubApplication.ts index 1faa2f42..02170e36 100644 --- a/packages/@best/agent-hub/src/HubApplication.ts +++ b/packages/@best/agent-hub/src/HubApplication.ts @@ -133,7 +133,9 @@ export class HubApplication { if (agent !== null) { this._incomingQueue.remove(job); - agent.runJob(job); + agent.runJob(job).catch(err => { + this._incomingQueue.push(job); + }); this._logger.event("Hub", 'PENDING_JOB_CHANGED'); } else { job.socketConnection.emit('benchmark_enqueued', { pending: this._incomingQueue.size }); @@ -219,7 +221,9 @@ export class HubApplication { if (agent.canRunJob(job!.spec)) { this._incomingQueue.remove(job!); this._logger.event("Hub", 'PENDING_JOB_CHANGED'); - agent.runJob(job!); + agent.runJob(job!).catch( (err) => { + this._incomingQueue.push(job!); + }); break; } }