Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 34 additions & 208 deletions agent/bin/agent.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,194 +3,66 @@
// found in the LICENSE file.

import 'dart:async';
import 'dart:convert';
import 'dart:io';

import 'package:args/args.dart';
import 'package:http/http.dart';
import 'package:meta/meta.dart';
import 'package:stack_trace/stack_trace.dart';

import 'package:cocoon_agent/src/adb.dart';
import 'package:cocoon_agent/src/analysis.dart';
import 'package:cocoon_agent/src/firebase.dart';
import 'package:cocoon_agent/src/framework.dart';
import 'package:cocoon_agent/src/gallery.dart';
import 'package:cocoon_agent/src/golem.dart';
import 'package:cocoon_agent/src/perf_tests.dart';
import 'package:cocoon_agent/src/refresh.dart';
import 'package:cocoon_agent/src/size_tests.dart';
import 'package:cocoon_agent/src/agent.dart';
import 'package:cocoon_agent/src/commands/ci.dart';
import 'package:cocoon_agent/src/commands/run.dart';
import 'package:cocoon_agent/src/utils.dart';

/// Agents periodically poll the server for more tasks. This sleep period is
/// used to prevent us from DDoS-ing the server.
const Duration _sleepBetweenBuilds = const Duration(seconds: 10);

final List<StreamSubscription> _streamSubscriptions = <StreamSubscription>[];
Future<Null> main(List<String> rawArgs) async {
ArgParser argParser = new ArgParser()
..addOption(
'config-file',
abbr: 'c',
defaultsTo: 'config.yaml'
)
..addOption(
'agent-checkout',
abbr: 'a',
defaultsTo: '.'
);
argParser.addCommand('ci');
argParser.addCommand('run', RunCommand.argParser);

bool _exiting = false;
ArgResults args = argParser.parse(rawArgs);

Future<Null> main(List<String> args) async {
Config.initialize(args);

print('Agent configuration:');
print(config);

Agent agent = new Agent(
baseCocoonUrl: config.baseCocoonUrl,
agentId: config.agentId,
httpClient: new AuthenticatedClient(config.agentId, config.authToken)
);

_listenToShutdownSignals();
while(!_exiting) {
try {
await _captureAsyncStacks(agent.performNextTaskIfAny);
} catch(error, chain) {
print('Caught: $error\n${(chain as Chain).terse}');
}
Map<String, Command> allCommands = <String, Command>{};

// TODO(yjbanov): report health status after running the task
await new Future.delayed(_sleepBetweenBuilds);
void registerCommand(Command command) {
allCommands[command.name] = command;
}
}

void _listenToShutdownSignals() {
_streamSubscriptions.addAll(<StreamSubscription>[
ProcessSignal.SIGINT.watch().listen((_) {
print('\nReceived SIGINT. Shutting down.');
_stop(ProcessSignal.SIGINT);
}),
ProcessSignal.SIGTERM.watch().listen((_) {
print('\nReceived SIGTERM. Shutting down.');
_stop(ProcessSignal.SIGTERM);
}),
]);
}

Future<Null> _stop(ProcessSignal signal) async {
_exiting = true;
for (StreamSubscription sub in _streamSubscriptions) {
await sub.cancel();
}
_streamSubscriptions.clear();
// TODO(yjbanov): stop processes launched by tasks, if any
await new Future.delayed(const Duration(seconds: 1));
exit(0);
}

class Agent {
Agent({@required this.baseCocoonUrl, @required this.agentId, @required this.httpClient});

final String baseCocoonUrl;
final String agentId;
final Client httpClient;

/// Makes a REST API request to Cocoon.
Future<dynamic> _cocoon(String apiPath, dynamic json) async {
String url = '$baseCocoonUrl/api/$apiPath';
Response resp = await httpClient.post(url, body: JSON.encode(json));
return JSON.decode(resp.body);
}

Future<Null> performNextTaskIfAny() async {
Map<String, dynamic> reservation = await reserveTask();
if (reservation['TaskEntity'] != null) {
String taskName = reservation['TaskEntity']['Task']['Name'];
String taskKey = reservation['TaskEntity']['Key'];
String revision = reservation['ChecklistEntity']['Checklist']['Commit']['Sha'];
section('Task info');
print('name : $taskName');
print('key : $taskKey');
print('revision : $revision');

try {
await _captureAsyncStacks(() async {
await getFlutterAt(revision);
int golemRevision = await computeGolemRevision();
DateTime revisionTimestamp = await getFlutterRepoCommitTimestamp(revision);
String dartSdkVersion = await getDartVersion();
Task task = getTask(taskName, revision, revisionTimestamp, dartSdkVersion);
TaskRunner runner = new TaskRunner(revision, golemRevision, <Task>[task]);
BuildResult result = await _runTask(runner);
// TODO(yjbanov): upload logs
if (result.succeeded) {
await updateTaskStatus(taskKey, 'Succeeded');
await _uploadDataToFirebase(result);
} else {
await updateTaskStatus(taskKey, 'Failed');
}
});
} catch(error, chain) {
// TODO(yjbanov): upload logs
print('Caught: $error\n${(chain as Chain).terse}');
await updateTaskStatus(taskKey, 'Failed');
}
}
}

Future<Null> _screenOff() async {
try {
await (await adb()).sendToSleep();
} catch(error, stackTrace) {
print('Failed to turn off screen: $error\n$stackTrace');
}
}
registerCommand(new ContinuousIntegrationCommand(agent));
registerCommand(new RunCommand(agent));

Future<BuildResult> _runTask(TaskRunner runner) async {
// Load-balance tests across attached devices
await pickNextDevice();
try {
return await runner.run();
} finally {
await _screenOff();
}
if (args.command == null) {
print('No command specified, expected one of: ${allCommands.keys.join(', ')}');
exit(1);
}

Task getTask(String taskName, String revision, DateTime revisionTimestamp, String dartSdkVersion) {
List<Task> allTasks = <Task>[
createComplexLayoutScrollPerfTest(),
createFlutterGalleryStartupTest(),
createComplexLayoutStartupTest(),
createFlutterGalleryBuildTest(),
createComplexLayoutBuildTest(),
createGalleryTransitionTest(),
createBasicMaterialAppSizeTest(),
createAnalyzerCliTest(sdk: dartSdkVersion, commit: revision, timestamp: revisionTimestamp),
createAnalyzerServerTest(sdk: dartSdkVersion, commit: revision, timestamp: revisionTimestamp),
createRefreshTest(commit: revision, timestamp: revisionTimestamp),
];
Command command = allCommands[args.command.name];

return allTasks.firstWhere(
(Task t) => t.name == taskName,
orElse: () {
throw 'Task $taskName not found';
}
);
if (command == null) {
print('Unrecognized command $command');
exit(1);
}

Future<Map<String, dynamic>> reserveTask() => _cocoon('reserve-task', {
'AgentID': agentId
});

Future<Null> getFlutterAt(String revision) async {
String currentRevision = await getCurrentFlutterRepoCommit();

// This agent will likely run multiple tasks in the same checklist and
// therefore the same revision. It would be too costly to have to reinstall
// Flutter every time.
if (currentRevision == revision) {
return;
}

await getFlutter(revision);
}
print('Agent configuration:');
print(config);

Future<Null> updateTaskStatus(String taskKey, String newStatus) async {
await _cocoon('update-task-status', {
'TaskKey': taskKey,
'NewStatus': newStatus,
});
}
await command.run(args.command);
}

class AuthenticatedClient extends BaseClient {
Expand All @@ -211,49 +83,3 @@ class AuthenticatedClient extends BaseClient {
return resp;
}
}

Future<Null> _uploadDataToFirebase(BuildResult result) async {
List<Map<String, dynamic>> golemData = <Map<String, dynamic>>[];

for (TaskResult taskResult in result.results) {
// TODO(devoncarew): We should also upload the fact that these tasks failed.
if (taskResult.data == null)
continue;

Map<String, dynamic> data = new Map<String, dynamic>.from(taskResult.data.json);

if (taskResult.data.benchmarkScoreKeys != null) {
for (String scoreKey in taskResult.data.benchmarkScoreKeys) {
String benchmarkName = '${taskResult.task.name}.$scoreKey';
if (registeredBenchmarkNames.contains(benchmarkName)) {
golemData.add(<String, dynamic>{
'benchmark_name': benchmarkName,
'golem_revision': result.golemRevision,
'score': taskResult.data.json[scoreKey],
});
}
}
}

data['__metadata__'] = <String, dynamic>{
'success': taskResult.succeeded,
'revision': taskResult.revision,
'message': taskResult.message,
};

data['__golem__'] = golemData;

uploadToFirebase(taskResult.task.name, data);
}
}

Future<Null> _captureAsyncStacks(Future<Null> callback()) {
Completer<Null> completer = new Completer<Null>();
Chain.capture(() async {
await callback();
completer.complete();
}, onError: (error, Chain chain) async {
completer.completeError(error, chain);
});
return completer.future;
}
Loading