Skip to content

Commit

Permalink
fix: refactor @best/agent using types (#161)
Browse files Browse the repository at this point in the history
* fix: agents

* fix: refactor agent

* feat: timeout on upload start and delete old code

* fix: include latest types

* fix: agent tests

* fix: more types

* wip: more types

* fix: types for runner-remote and sample config

* fix: remove types in package.json

* fix: minor types and styling
  • Loading branch information
jodarove authored and Diego Ferreiro Val committed Jun 21, 2019
1 parent a96c14c commit 5578e41
Show file tree
Hide file tree
Showing 23 changed files with 682 additions and 444 deletions.
6 changes: 3 additions & 3 deletions packages/@best/agent/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
"@best/utils": "4.0.0",
"chalk": "~2.4.2",
"express": "~4.16.2",
"socket.io": "~2.0.4",
"socket.io-file": "~2.0.2",
"tar": "~4.2.0"
"socket.io": "~2.2.0",
"socket.io-file": "~2.0.31",
"tar": "~4.4.10"
}
}
50 changes: 50 additions & 0 deletions packages/@best/agent/src/AgentApp.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import * as SocketIO from "socket.io";
import ObservableQueue from "./utils/ObservableQueue";
import BenchmarkRunner, { RunnerStatus } from "./BenchmarkRunner";
import BenchmarkTask from "./BenchmarkTask";
import { BuildConfig } from "@best/types";

export class AgentApp {
private queue: ObservableQueue<BenchmarkTask>;
private runner: BenchmarkRunner;

constructor(queue: ObservableQueue<BenchmarkTask>, runner: BenchmarkRunner) {
this.queue = queue;
this.runner = runner;

this.initializeHandlers();
}

private initializeHandlers() {
this.queue.on('item-added', (task: BenchmarkTask) => this.handleJobAddedInQueue(task));
this.runner.on('idle-runner', (runner: BenchmarkRunner) => this.handleIdleRunner(runner));
}

handleIncomingConnection(socket: SocketIO.Socket) {
socket.on('benchmark_task', (data: BuildConfig) => {
const task = new BenchmarkTask(data, socket);

socket.on('disconnect', () => {
this.queue.remove(task);
this.runner.cancelRun(task);
});

this.queue.push(task);
});
}

private handleJobAddedInQueue(task: BenchmarkTask) {
if (this.runner.status === RunnerStatus.IDLE) {
this.queue.remove(task);
this.runner.run(task);
} else {
task.socketConnection.emit('benchmark_enqueued', { pending: this.queue.size });
}
}

private handleIdleRunner(runner: BenchmarkRunner) {
if (this.queue.size > 0) {
runner.run(this.queue.pop()!);
}
}
}
153 changes: 153 additions & 0 deletions packages/@best/agent/src/BenchmarkRunner.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import path from 'path';
import { EventEmitter } from "events";
import { runBenchmark } from '@best/runner';
import BenchmarkTask from "./BenchmarkTask";
import { loadBenchmarkJob } from "./benchmark-loader";
import { x as extractTar } from 'tar';
import * as SocketIO from "socket.io";
import { RunnerOutputStream } from "@best/console-stream";
import {
BenchmarkResultsSnapshot,
BenchmarkResultsState,
BenchmarkRuntimeConfig
} from "@best/types";

export enum RunnerStatus {
IDLE = 1,
RUNNING,
}

// @todo: make a Runner Stream, and add an interface type instead of the class.
function initializeForwarder(socket: SocketIO.Socket, logger: Function): RunnerOutputStream {
return {
init() {},
finish() {},
onBenchmarkStart(benchmarkPath: string) {
if (socket.connected) {
logger(`STATUS: running_benchmark ${benchmarkPath}`);
socket.emit('running_benchmark_start', benchmarkPath);
}
},
onBenchmarkEnd(benchmarkPath: string) {
if (socket.connected) {
logger(`STATUS: finished_benchmark ${benchmarkPath}`);
socket.emit('running_benchmark_end', benchmarkPath);
}
},
onBenchmarkError(benchmarkPath: string) {
if (socket.connected) {
socket.emit('running_benchmark_error', benchmarkPath);
}
},
updateBenchmarkProgress(state: BenchmarkResultsState, opts: BenchmarkRuntimeConfig) {
if (socket.connected) {
socket.emit('running_benchmark_update', {state, opts});
}
},
} as RunnerOutputStream;
}

function extractBenchmarkTarFile(task: BenchmarkTask) {
return ({ uploadDir } : { uploadDir: string }) => {
const benchmarkName = task.benchmarkName;
const benchmarkDirname = path.dirname(uploadDir);

task.benchmarkFolder = benchmarkDirname;
task.benchmarkEntry = path.join(benchmarkDirname, `${benchmarkName}.html`);

return extractTar({cwd: benchmarkDirname, file: uploadDir});
};
}

export default class BenchmarkRunner extends EventEmitter {
public _status: RunnerStatus = RunnerStatus.IDLE;
public runningTask: BenchmarkTask | null = null;
public runningWasCancelled = false;
private _log: Function = () => {};

get status() {
return this._status;
}

set status(value: RunnerStatus) {
if (value !== this._status) {
this._status = value;
if (value === RunnerStatus.IDLE) {
this.emit('idle-runner', this);
}
}
}

cancelRun(task: BenchmarkTask) {
if (this.runningTask === task) {
this._log('Running was cancelled.');
this.runningWasCancelled = true;
}
}

run(task: BenchmarkTask) {
if (this.status !== RunnerStatus.IDLE) {
throw new Error("Trying to run a new benchmark while runner is busy");
}

this.status = RunnerStatus.RUNNING;
this.runningWasCancelled = false;
this.runningTask = task;
this._log = (msg: string) => {
if (!this.runningWasCancelled) {
process.stdout.write(`Task[${task.socketConnection.id}] - ${msg}\n`);
}
};

// @todo: just to be safe, add timeout in cancel so it waits for the runner to finish or dismiss the run assuming something went wrong
loadBenchmarkJob(task.socketConnection)
.then(extractBenchmarkTarFile(task))
.then(() => this.runBenchmark(task))
.then(({ error, results }: {error: any, results: any}) => {
this.afterRunBenchmark(error, results);
})
.catch((err: any) => {
this.afterRunBenchmark(err, null);
})
}

private async runBenchmark(task: BenchmarkTask) {
const { benchmarkName } = task;
const messenger = initializeForwarder(task.socketConnection, this._log);

let results;
let error;

try {
this._log(`Running benchmark ${benchmarkName}`);

results = await runBenchmark(task.config, messenger);

this._log(`Benchmark ${benchmarkName} completed successfully`);
} catch (err) {
this._log(`Something went wrong while running ${benchmarkName}`);
process.stderr.write(err + '\n');
error = err;
}

return { error, results }
}

private afterRunBenchmark(err: any, results: BenchmarkResultsSnapshot | null) {
if (!this.runningWasCancelled) {
this._log(`Sending results to client`);

if (err) {
this._log(`Sending error`);
this.runningTask!.socketConnection.emit('benchmark_error', err.toString());
} else {
this._log(`Sending results`);
this.runningTask!.socketConnection.emit('benchmark_results', results);
}
}

this.runningWasCancelled = false;
this.runningTask = null;
this.status = RunnerStatus.IDLE;
}
}
37 changes: 37 additions & 0 deletions packages/@best/agent/src/BenchmarkTask.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import * as SocketIO from "socket.io";
import { BuildConfig, GlobalConfig, ProjectConfig } from "@best/types";

export default class BenchmarkTask {
public projectConfig: ProjectConfig;
public globalConfig: GlobalConfig;
public socketConnection: SocketIO.Socket;
private readonly _taskConfig: BuildConfig;

constructor(taskConfig: BuildConfig, socket: SocketIO.Socket) {
this._taskConfig = taskConfig;

this.projectConfig = taskConfig.projectConfig;
this.globalConfig = taskConfig.globalConfig;
this.socketConnection = socket;
}

get config() {
return this._taskConfig;
}

get benchmarkName() {
return this._taskConfig.benchmarkName;
}

get benchmarkSignature() {
return this._taskConfig.benchmarkSignature;
}

set benchmarkEntry(value: string) {
this._taskConfig.benchmarkEntry = value;
}

set benchmarkFolder(value: string) {
this._taskConfig.benchmarkFolder = value;
}
}
Loading

0 comments on commit 5578e41

Please sign in to comment.