diff --git a/agent/bin/agent.dart b/agent/bin/agent.dart index d15ce1462e..2ecf78ab72 100644 --- a/agent/bin/agent.dart +++ b/agent/bin/agent.dart @@ -3,194 +3,66 @@ // found in the LICENSE file. import 'dart:async'; -import 'dart:convert'; import 'dart:io'; +import 'package:args/args.dart'; import 'package:http/http.dart'; -import 'package:meta/meta.dart'; -import 'package:stack_trace/stack_trace.dart'; -import 'package:cocoon_agent/src/adb.dart'; -import 'package:cocoon_agent/src/analysis.dart'; -import 'package:cocoon_agent/src/firebase.dart'; -import 'package:cocoon_agent/src/framework.dart'; -import 'package:cocoon_agent/src/gallery.dart'; -import 'package:cocoon_agent/src/golem.dart'; -import 'package:cocoon_agent/src/perf_tests.dart'; -import 'package:cocoon_agent/src/refresh.dart'; -import 'package:cocoon_agent/src/size_tests.dart'; +import 'package:cocoon_agent/src/agent.dart'; +import 'package:cocoon_agent/src/commands/ci.dart'; +import 'package:cocoon_agent/src/commands/run.dart'; import 'package:cocoon_agent/src/utils.dart'; -/// Agents periodically poll the server for more tasks. This sleep period is -/// used to prevent us from DDoS-ing the server. -const Duration _sleepBetweenBuilds = const Duration(seconds: 10); - -final List _streamSubscriptions = []; +Future main(List rawArgs) async { + ArgParser argParser = new ArgParser() + ..addOption( + 'config-file', + abbr: 'c', + defaultsTo: 'config.yaml' + ) + ..addOption( + 'agent-checkout', + abbr: 'a', + defaultsTo: '.' + ); + argParser.addCommand('ci'); + argParser.addCommand('run', RunCommand.argParser); -bool _exiting = false; + ArgResults args = argParser.parse(rawArgs); -Future main(List args) async { Config.initialize(args); - print('Agent configuration:'); - print(config); - Agent agent = new Agent( baseCocoonUrl: config.baseCocoonUrl, agentId: config.agentId, httpClient: new AuthenticatedClient(config.agentId, config.authToken) ); - _listenToShutdownSignals(); - while(!_exiting) { - try { - await _captureAsyncStacks(agent.performNextTaskIfAny); - } catch(error, chain) { - print('Caught: $error\n${(chain as Chain).terse}'); - } + Map allCommands = {}; - // TODO(yjbanov): report health status after running the task - await new Future.delayed(_sleepBetweenBuilds); + void registerCommand(Command command) { + allCommands[command.name] = command; } -} - -void _listenToShutdownSignals() { - _streamSubscriptions.addAll([ - ProcessSignal.SIGINT.watch().listen((_) { - print('\nReceived SIGINT. Shutting down.'); - _stop(ProcessSignal.SIGINT); - }), - ProcessSignal.SIGTERM.watch().listen((_) { - print('\nReceived SIGTERM. Shutting down.'); - _stop(ProcessSignal.SIGTERM); - }), - ]); -} -Future _stop(ProcessSignal signal) async { - _exiting = true; - for (StreamSubscription sub in _streamSubscriptions) { - await sub.cancel(); - } - _streamSubscriptions.clear(); - // TODO(yjbanov): stop processes launched by tasks, if any - await new Future.delayed(const Duration(seconds: 1)); - exit(0); -} - -class Agent { - Agent({@required this.baseCocoonUrl, @required this.agentId, @required this.httpClient}); - - final String baseCocoonUrl; - final String agentId; - final Client httpClient; - - /// Makes a REST API request to Cocoon. - Future _cocoon(String apiPath, dynamic json) async { - String url = '$baseCocoonUrl/api/$apiPath'; - Response resp = await httpClient.post(url, body: JSON.encode(json)); - return JSON.decode(resp.body); - } - - Future performNextTaskIfAny() async { - Map reservation = await reserveTask(); - if (reservation['TaskEntity'] != null) { - String taskName = reservation['TaskEntity']['Task']['Name']; - String taskKey = reservation['TaskEntity']['Key']; - String revision = reservation['ChecklistEntity']['Checklist']['Commit']['Sha']; - section('Task info'); - print('name : $taskName'); - print('key : $taskKey'); - print('revision : $revision'); - - try { - await _captureAsyncStacks(() async { - await getFlutterAt(revision); - int golemRevision = await computeGolemRevision(); - DateTime revisionTimestamp = await getFlutterRepoCommitTimestamp(revision); - String dartSdkVersion = await getDartVersion(); - Task task = getTask(taskName, revision, revisionTimestamp, dartSdkVersion); - TaskRunner runner = new TaskRunner(revision, golemRevision, [task]); - BuildResult result = await _runTask(runner); - // TODO(yjbanov): upload logs - if (result.succeeded) { - await updateTaskStatus(taskKey, 'Succeeded'); - await _uploadDataToFirebase(result); - } else { - await updateTaskStatus(taskKey, 'Failed'); - } - }); - } catch(error, chain) { - // TODO(yjbanov): upload logs - print('Caught: $error\n${(chain as Chain).terse}'); - await updateTaskStatus(taskKey, 'Failed'); - } - } - } - - Future _screenOff() async { - try { - await (await adb()).sendToSleep(); - } catch(error, stackTrace) { - print('Failed to turn off screen: $error\n$stackTrace'); - } - } + registerCommand(new ContinuousIntegrationCommand(agent)); + registerCommand(new RunCommand(agent)); - Future _runTask(TaskRunner runner) async { - // Load-balance tests across attached devices - await pickNextDevice(); - try { - return await runner.run(); - } finally { - await _screenOff(); - } + if (args.command == null) { + print('No command specified, expected one of: ${allCommands.keys.join(', ')}'); + exit(1); } - Task getTask(String taskName, String revision, DateTime revisionTimestamp, String dartSdkVersion) { - List allTasks = [ - createComplexLayoutScrollPerfTest(), - createFlutterGalleryStartupTest(), - createComplexLayoutStartupTest(), - createFlutterGalleryBuildTest(), - createComplexLayoutBuildTest(), - createGalleryTransitionTest(), - createBasicMaterialAppSizeTest(), - createAnalyzerCliTest(sdk: dartSdkVersion, commit: revision, timestamp: revisionTimestamp), - createAnalyzerServerTest(sdk: dartSdkVersion, commit: revision, timestamp: revisionTimestamp), - createRefreshTest(commit: revision, timestamp: revisionTimestamp), - ]; + Command command = allCommands[args.command.name]; - return allTasks.firstWhere( - (Task t) => t.name == taskName, - orElse: () { - throw 'Task $taskName not found'; - } - ); + if (command == null) { + print('Unrecognized command $command'); + exit(1); } - Future> reserveTask() => _cocoon('reserve-task', { - 'AgentID': agentId - }); - - Future getFlutterAt(String revision) async { - String currentRevision = await getCurrentFlutterRepoCommit(); - - // This agent will likely run multiple tasks in the same checklist and - // therefore the same revision. It would be too costly to have to reinstall - // Flutter every time. - if (currentRevision == revision) { - return; - } - - await getFlutter(revision); - } + print('Agent configuration:'); + print(config); - Future updateTaskStatus(String taskKey, String newStatus) async { - await _cocoon('update-task-status', { - 'TaskKey': taskKey, - 'NewStatus': newStatus, - }); - } + await command.run(args.command); } class AuthenticatedClient extends BaseClient { @@ -211,49 +83,3 @@ class AuthenticatedClient extends BaseClient { return resp; } } - -Future _uploadDataToFirebase(BuildResult result) async { - List> golemData = >[]; - - for (TaskResult taskResult in result.results) { - // TODO(devoncarew): We should also upload the fact that these tasks failed. - if (taskResult.data == null) - continue; - - Map data = new Map.from(taskResult.data.json); - - if (taskResult.data.benchmarkScoreKeys != null) { - for (String scoreKey in taskResult.data.benchmarkScoreKeys) { - String benchmarkName = '${taskResult.task.name}.$scoreKey'; - if (registeredBenchmarkNames.contains(benchmarkName)) { - golemData.add({ - 'benchmark_name': benchmarkName, - 'golem_revision': result.golemRevision, - 'score': taskResult.data.json[scoreKey], - }); - } - } - } - - data['__metadata__'] = { - 'success': taskResult.succeeded, - 'revision': taskResult.revision, - 'message': taskResult.message, - }; - - data['__golem__'] = golemData; - - uploadToFirebase(taskResult.task.name, data); - } -} - -Future _captureAsyncStacks(Future callback()) { - Completer completer = new Completer(); - Chain.capture(() async { - await callback(); - completer.complete(); - }, onError: (error, Chain chain) async { - completer.completeError(error, chain); - }); - return completer.future; -} diff --git a/agent/lib/src/agent.dart b/agent/lib/src/agent.dart new file mode 100644 index 0000000000..5c5ffbfa7c --- /dev/null +++ b/agent/lib/src/agent.dart @@ -0,0 +1,155 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +import 'dart:async'; +import 'dart:convert'; + +import 'package:args/args.dart'; +import 'package:http/http.dart'; +import 'package:meta/meta.dart'; + +import 'package:cocoon_agent/src/adb.dart'; +import 'package:cocoon_agent/src/analysis.dart'; +import 'package:cocoon_agent/src/framework.dart'; +import 'package:cocoon_agent/src/gallery.dart'; +import 'package:cocoon_agent/src/golem.dart'; +import 'package:cocoon_agent/src/perf_tests.dart'; +import 'package:cocoon_agent/src/refresh.dart'; +import 'package:cocoon_agent/src/size_tests.dart'; +import 'package:cocoon_agent/src/utils.dart'; + +/// Contains information about a Cocoon task. +class CocoonTask { + CocoonTask({this.name, this.key, this.revision}); + + final String name; + final String key; + final String revision; +} + +/// Client to the Coocon backend. +class Agent { + Agent({@required this.baseCocoonUrl, @required this.agentId, @required this.httpClient}); + + final String baseCocoonUrl; + final String agentId; + final Client httpClient; + + /// Makes a REST API request to Cocoon. + Future _cocoon(String apiPath, dynamic json) async { + String url = '$baseCocoonUrl/api/$apiPath'; + Response resp = await httpClient.post(url, body: JSON.encode(json)); + return JSON.decode(resp.body); + } + + Future performTask(CocoonTask reservation) async { + int golemRevision = await computeGolemRevision(); + Task task = await getTask(reservation); + TaskRunner runner = new TaskRunner(reservation.revision, golemRevision, [task]); + try { + return await runner.run(); + } finally { + await _screenOff(); + } + } + + Future _screenOff() async { + try { + await (await adb()).sendToSleep(); + } catch(error, stackTrace) { + print('Failed to turn off screen: $error\n$stackTrace'); + } + } + + Future uploadLogChunk(CocoonTask task, String chunk) async { + String url = '$baseCocoonUrl/api/append-log?ownerKey=${task.key}'; + Response resp = await httpClient.post(url, body: chunk); + if (resp.statusCode != 200) { + throw 'Failed uploading log chunk. Server responded with HTTP status ${resp.statusCode}\n' + '${resp.body}'; + } + } + + Future getTask(CocoonTask task) async { + DateTime revisionTimestamp = await getFlutterRepoCommitTimestamp(task.revision); + String dartSdkVersion = await getDartVersion(); + + List allTasks = [ + createComplexLayoutScrollPerfTest(), + createFlutterGalleryStartupTest(), + createComplexLayoutStartupTest(), + createFlutterGalleryBuildTest(), + createComplexLayoutBuildTest(), + createGalleryTransitionTest(), + createBasicMaterialAppSizeTest(), + createAnalyzerCliTest(sdk: dartSdkVersion, commit: task.revision, timestamp: revisionTimestamp), + createAnalyzerServerTest(sdk: dartSdkVersion, commit: task.revision, timestamp: revisionTimestamp), + createRefreshTest(commit: task.revision, timestamp: revisionTimestamp), + ]; + + return allTasks.firstWhere( + (Task t) => t.name == task.name, + orElse: () { + throw 'Task $task.name not found'; + } + ); + } + + /// Reserves a task in Cocoon backend to be performed by this agent. + /// + /// If not tasks are available returns `null`. + Future reserveTask() async { + Map reservation = await _cocoon('reserve-task', { + 'AgentID': agentId + }); + + if (reservation['TaskEntity'] != null) { + return new CocoonTask( + name: reservation['TaskEntity']['Task']['Name'], + key: reservation['TaskEntity']['Key'], + revision: reservation['ChecklistEntity']['Checklist']['Commit']['Sha'] + ); + } + + return null; + } + + Future updateTaskStatus(String taskKey, String newStatus) async { + await _cocoon('update-task-status', { + 'TaskKey': taskKey, + 'NewStatus': newStatus, + }); + } +} + +class AuthenticatedClient extends BaseClient { + AuthenticatedClient(this._agentId, this._authToken); + + final String _agentId; + final String _authToken; + final Client _delegate = new Client(); + + Future send(Request request) async { + request.headers['Agent-ID'] = _agentId; + request.headers['Agent-Auth-Token'] = _authToken; + StreamedResponse resp = await _delegate.send(request); + + if (resp.statusCode != 200) + throw 'HTTP error ${resp.statusCode}:\n${(await Response.fromStream(resp)).body}'; + + return resp; + } +} + +abstract class Command { + Command(this.name, this.agent); + + /// Command name as it appears in the CLI. + final String name; + + /// Coocon agent client. + final Agent agent; + + Future run(ArgResults args); +} diff --git a/agent/lib/src/commands/ci.dart b/agent/lib/src/commands/ci.dart new file mode 100644 index 0000000000..387212bc30 --- /dev/null +++ b/agent/lib/src/commands/ci.dart @@ -0,0 +1,137 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +import 'dart:async'; +import 'dart:convert'; +import 'dart:io'; + +import 'package:args/args.dart'; + +import '../agent.dart'; +import '../utils.dart'; + +/// Agents periodically poll the server for more tasks. This sleep period is +/// used to prevent us from DDoS-ing the server. +const Duration _sleepBetweenBuilds = const Duration(seconds: 10); + +/// Maximum amount of time a single task is allowed to take. +/// +/// After that the task is killed and reported as failed. +const Duration _taskTimeout = const Duration(minutes: 10); + +/// Runs the agent in continuous integration mode. +/// +/// In this mode the agent runs in an infinite loop, continuously asking for +/// more tasks from Cocoon, performing them and reporting back results. +class ContinuousIntegrationCommand extends Command { + ContinuousIntegrationCommand(Agent agent) : super('ci', agent); + + final List _streamSubscriptions = []; + + bool _exiting = false; + + @override + Future run(ArgResults args) async { + _listenToShutdownSignals(); + while(!_exiting) { + try { + CocoonTask task = await agent.reserveTask(); + try { + if (task != null) { + // Sync flutter outside of the task so it does not contribute to + // the task timeout. + await getFlutterAt(task.revision); + + // No need to pass revision as repo syncing is done here. + List runnerArgs = [ + 'run', + '--task-name=${task.name}', + '--task-key=${task.key}', + ]; + + Process proc = await startProcess( + dartBin, + [config.runTaskFile.path]..addAll(runnerArgs), + onKill: new Future.delayed(_taskTimeout) + ); + + _logStandardStreams(task, proc); + await proc.exitCode; + } + } catch(error, stackTrace) { + print('ERROR: $error\n$stackTrace'); + await agent.updateTaskStatus(task.key, 'Failed'); + } + } catch(error, stackTrace) { + print('ERROR: $error\n$stackTrace'); + } + + // TODO(yjbanov): report health status after running the task + await new Future.delayed(_sleepBetweenBuilds); + } + } + + /// Listens to standard output and upload logs to Cocoon in semi-realtime. + Future _logStandardStreams(CocoonTask task, Process proc) async { + StringBuffer buffer = new StringBuffer(); + + Future sendLog(String message, {bool flush: false}) async { + buffer.write(message); + print('[task runner] $message'); + // Send in chunks 100KB each, or upon request. + if (flush || buffer.length > 100000) { + String chunk = buffer.toString(); + buffer = new StringBuffer(); + await agent.uploadLogChunk(task, chunk); + } + } + + proc.stdout.transform(UTF8.decoder).listen((String s) { + sendLog(s); + }); + proc.stderr.transform(UTF8.decoder).listen((String s) { + sendLog(s); + }); + await proc.exitCode; + sendLog('Task execution finished', flush: true); + } + + void _listenToShutdownSignals() { + _streamSubscriptions.addAll([ + ProcessSignal.SIGINT.watch().listen((_) { + print('\nReceived SIGINT. Shutting down.'); + _stop(ProcessSignal.SIGINT); + }), + ProcessSignal.SIGTERM.watch().listen((_) { + print('\nReceived SIGTERM. Shutting down.'); + _stop(ProcessSignal.SIGTERM); + }), + ]); + } + + Future _stop(ProcessSignal signal) async { + _exiting = true; + for (StreamSubscription sub in _streamSubscriptions) { + await sub.cancel(); + } + _streamSubscriptions.clear(); + // TODO(yjbanov): stop processes launched by tasks, if any + await new Future.delayed(const Duration(seconds: 1)); + exit(0); + } +} + +Future getFlutterAt(String revision) async { + String currentRevision = await getCurrentFlutterRepoCommit(); + + // This agent will likely run multiple tasks in the same checklist and + // therefore the same revision. It would be too costly to have to reinstall + // Flutter every time. + if (currentRevision == revision) { + print('Reusing previously checked out Flutter revision: $revision'); + return; + } + + await getFlutter(revision); +} diff --git a/agent/lib/src/commands/run.dart b/agent/lib/src/commands/run.dart new file mode 100644 index 0000000000..7ef44a9d90 --- /dev/null +++ b/agent/lib/src/commands/run.dart @@ -0,0 +1,115 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +import 'dart:async'; +import 'dart:io'; + +import 'package:args/args.dart'; +import 'package:stack_trace/stack_trace.dart'; + +import '../adb.dart'; +import '../agent.dart'; +import '../golem.dart'; +import '../firebase.dart'; +import '../framework.dart'; +import '../utils.dart'; + +/// Runs the agent once to perform a specific task and, optionally, report the +/// result back to Cocoon backend. +/// +/// The task is run at the currently synced revision of Flutter. This command +/// assumes that Flutter is already synced to the desired revision. +class RunCommand extends Command { + RunCommand(Agent agent) : super('run', agent); + + static final ArgParser argParser = new ArgParser() + ..addOption( + 'task-name', + help: '(required) The name of the task to run.' + ) + ..addOption( + 'task-key', + help: '(optional) The key of the task to update the status of in Cocoon. ' + 'It is only required if you want the runner to upload logs and ' + 'change task status in Cocoon.' + ); + + @override + Future run(ArgResults args) async { + CocoonTask task = new CocoonTask( + name: args['task-name'], + key: args['task-key'], + revision: await getCurrentFlutterRepoCommit() + ); + + section('Task info:'); + print(' name : ${task.name}'); + print(' key : ${task.key ?? ""}'); + print(' revision : ${task.revision}'); + + if (task.name == null || task.name == '') { + print('\n Incorrect command-line options. Usage:'); + print(argParser.usage); + exit(1); + } + + try { + await runAndCaptureAsyncStacks(() async { + // Load-balance tests across attached devices + pickNextDevice(); + + BuildResult result = await agent.performTask(task); + if (task.key != null) { + if (result.succeeded) { + await agent.updateTaskStatus(task.key, 'Succeeded'); + await _uploadDataToFirebase(result); + } else { + await agent.updateTaskStatus(task.key, 'Failed'); + } + } + }); + } catch(error, chain) { + // TODO(yjbanov): upload logs + print('Caught: $error\n${(chain as Chain).terse}'); + if (task.key != null) + await agent.updateTaskStatus(task.key, 'Failed'); + exit(1); + } + } +} + +Future _uploadDataToFirebase(BuildResult result) async { + List> golemData = >[]; + + for (TaskResult taskResult in result.results) { + // TODO(devoncarew): We should also upload the fact that these tasks failed. + if (taskResult.data == null) + continue; + + Map data = new Map.from(taskResult.data.json); + + if (taskResult.data.benchmarkScoreKeys != null) { + for (String scoreKey in taskResult.data.benchmarkScoreKeys) { + String benchmarkName = '${taskResult.task.name}.$scoreKey'; + if (registeredBenchmarkNames.contains(benchmarkName)) { + golemData.add({ + 'benchmark_name': benchmarkName, + 'golem_revision': result.golemRevision, + 'score': taskResult.data.json[scoreKey], + }); + } + } + } + + data['__metadata__'] = { + 'success': taskResult.succeeded, + 'revision': taskResult.revision, + 'message': taskResult.message, + }; + + data['__golem__'] = golemData; + + uploadToFirebase(taskResult.task.name, data); + } +} diff --git a/agent/lib/src/utils.dart b/agent/lib/src/utils.dart index 4a5c2adc39..0cb96f4121 100644 --- a/agent/lib/src/utils.dart +++ b/agent/lib/src/utils.dart @@ -9,6 +9,7 @@ import 'dart:io'; import 'package:args/args.dart'; import 'package:meta/meta.dart'; import 'package:path/path.dart' as path; +import 'package:stack_trace/stack_trace.dart'; import 'package:yaml/yaml.dart'; /// Virtual current working directory, which affect functions, such as [exec]. @@ -111,6 +112,7 @@ Future getFlutterRepoCommitTimestamp(String commit) { Future startProcess(String executable, List arguments, {Map env, Future onKill}) async { + print('Executing: $executable ${arguments?.join(' ') ?? ""}'); Process proc = await Process.start(executable, arguments, environment: env, workingDirectory: cwd); if (onKill != null) { @@ -135,7 +137,6 @@ Future startProcess(String executable, List arguments, /// Executes a command and returns its exit code. Future exec(String executable, List arguments, {Map env, bool canFail: false, Future onKill}) async { - print('Executing: $executable ${arguments.join(' ')}'); Process proc = await startProcess(executable, arguments, env: env, onKill: onKill); proc.stdout @@ -156,11 +157,14 @@ Future exec(String executable, List arguments, } /// Executes a command and returns its standard output as a String. +/// +/// Standard error is redirected to the current process' standard error stream. Future eval(String executable, List arguments, {Map env, bool canFail: false, Future onKill}) async { - print('Executing: $executable ${arguments.join(' ')}'); Process proc = await startProcess(executable, arguments, env: env, onKill: onKill); - stderr.addStream(proc.stderr); + proc.stderr.listen((List data) { + stderr.write(data); + }); String output = await UTF8.decodeStream(proc.stdout); int exitCode = await proc.exitCode; @@ -224,24 +228,23 @@ class Config { this.agentId, this.firebaseFlutterDashboardToken, this.authToken, - this.flutterDirectory + this.flutterDirectory, + this.runTaskFile }); - static final ArgParser _argParser = new ArgParser() - ..addOption( - 'config-file', - abbr: 'c', - defaultsTo: 'config.yaml' - ); - - static void initialize(List rawArgs) { - ArgResults args = _argParser.parse(rawArgs); + static void initialize(ArgResults args) { File agentConfigFile = file(args['config-file']); if (!agentConfigFile.existsSync()) { throw ('Agent config file not found: ${agentConfigFile.path}.'); } + File runTaskFile = file('${args['agent-checkout']}/bin/agent.dart'); + + if (!runTaskFile.existsSync()) { + throw ('run_task.dart file not found: ${runTaskFile.path}.'); + } + Map agentConfig = loadYaml(agentConfigFile.readAsStringSync()); String baseCocoonUrl = agentConfig['base_cocoon_url'] ?? 'https://flutter-dashboard.appspot.com'; String agentId = requireConfigProperty(agentConfig, 'agent_id'); @@ -255,7 +258,8 @@ class Config { agentId: agentId, firebaseFlutterDashboardToken: firebaseFlutterDashboardToken, authToken: authToken, - flutterDirectory: flutterDirectory + flutterDirectory: flutterDirectory, + runTaskFile: runTaskFile ); } @@ -264,6 +268,7 @@ class Config { final String firebaseFlutterDashboardToken; final String authToken; final Directory flutterDirectory; + final File runTaskFile; String get adbPath { String androidHome = Platform.environment['ANDROID_HOME']; @@ -286,6 +291,7 @@ class Config { baseCocoonUrl: $baseCocoonUrl agentId: $agentId flutterDirectory: $flutterDirectory +runTaskFile: $runTaskFile adbPath: $adbPath '''.trim(); } @@ -423,3 +429,26 @@ Iterable grep(Pattern pattern, {@required String from}) { return line.contains(pattern); }); } + +/// Captures asynchronous stack traces thrown by [callback]. +/// +/// This is a convenience wrapper around [Chain] optimized for use with +/// `async`/`await`. +/// +/// Example: +/// +/// try { +/// await captureAsyncStacks(() { /* async things */ }); +/// } catch (error, chain) { +/// +/// } +Future runAndCaptureAsyncStacks(Future callback()) { + Completer completer = new Completer(); + Chain.capture(() async { + await callback(); + completer.complete(); + }, onError: (error, Chain chain) async { + completer.completeError(error, chain); + }); + return completer.future; +} diff --git a/app/index.yaml b/app/index.yaml index a0c2c39ea8..3627fb3602 100644 --- a/app/index.yaml +++ b/app/index.yaml @@ -19,3 +19,9 @@ indexes: - name: Name - name: CreateTimestamp direction: desc +- kind: LogChunk + ancestor: no + properties: + - name: OwnerKey + - name: CreateTimestamp + direction: desc diff --git a/app/lib/cli.dart b/app/lib/cli.dart index 69049cd967..65ebd86e72 100644 --- a/app/lib/cli.dart +++ b/app/lib/cli.dart @@ -52,7 +52,7 @@ abstract class CliCommand { CliCommand(this.name); /// Command name as it appears in the CLI. - final name; + final String name; ArgParser get argParser; diff --git a/app/lib/components/status_table.dart b/app/lib/components/status_table.dart index b13a9b2260..ce197a4b68 100644 --- a/app/lib/components/status_table.dart +++ b/app/lib/components/status_table.dart @@ -4,6 +4,7 @@ import 'dart:async'; import 'dart:convert' show JSON; +import 'dart:html'; import 'package:angular2/angular2.dart'; import 'package:cocoon/model.dart'; @@ -36,7 +37,9 @@ import 'package:http/http.dart' as http; ({{shortSha(status.checklist.checklist.commit.author.login)}}) -
+
+
@@ -104,12 +107,29 @@ class StatusTable implements OnInit { return ['task-status-circle', statusMap[taskStatus] ?? 'task-unknown']; } - List getStatusStyle(String sha, String taskName) { + TaskEntity _findTask(String sha, String taskName) { Map row = resultMatrix[sha]; + if (row == null || !row.containsKey(taskName)) + return null; + + return row[taskName]; + } + + List getStatusStyle(String sha, String taskName) { + TaskEntity taskEntity = _findTask(sha, taskName); + + if (taskEntity == null) return taskStatusToCssStyle('Skipped'); - TaskEntity task = row[taskName]; - return taskStatusToCssStyle(task.task.status); + + return taskStatusToCssStyle(taskEntity.task.status); + } + + void openLog(String sha, String taskName) { + TaskEntity taskEntity = _findTask(sha, taskName); + + if (taskEntity != null) + window.open('/api/get-log?ownerKey=${taskEntity.key.value}', '_blank'); } } @@ -126,7 +146,7 @@ class HeaderRow { } ); for (TaskEntity taskEntity in stage.tasks) { - header.addMetaTask(taskEntity.task); + header.addMetaTask(taskEntity); } stageHeaders.sort((StageHeader a, StageHeader b) { const stagePrecedence = const [ @@ -154,19 +174,21 @@ class StageHeader { final String stageName; final List metaTasks = []; - void addMetaTask(Task task) { + void addMetaTask(TaskEntity taskEntity) { + Task task = taskEntity.task; if (metaTasks.any((MetaTask m) => m.name == task.name)) return; - metaTasks.add(new MetaTask(task.name, task.stageName)); + metaTasks.add(new MetaTask(taskEntity.key, task.name, task.stageName)); } } /// Information about a task without a result. class MetaTask { - MetaTask(this.name, String stageName) + MetaTask(this.key, this.name, String stageName) : this.stageName = stageName, iconUrl = _iconForStageName(stageName); + final Key key; final String name; final String stageName; final String iconUrl; diff --git a/app/main.go b/app/main.go index 781b0d25b5..c02c70820d 100644 --- a/app/main.go +++ b/app/main.go @@ -35,6 +35,9 @@ func init() { registerRPC("/api/reserve-task", commands.ReserveTask) registerRPC("/api/update-task-status", commands.UpdateTaskStatus) + registerRawHandler("/api/append-log", commands.AppendLog) + registerRawHandler("/api/get-log", commands.GetLog) + // IMPORTANT: This is the ONLY path that does NOT require authentication. Do // not copy this pattern. Use registerRPC instead. http.HandleFunc("/api/get-authentication-status", getAuthenticationStatus) @@ -45,17 +48,31 @@ func init() { } } -func registerRPC(path string, handler func(cocoon *db.Cocoon, inputJSON []byte) (interface{}, error)) { +// Registers a request handler that takes arbitrary HTTP requests and outputs arbitrary data back. +func registerRawHandler(path string, handler func(cocoon *db.Cocoon, w http.ResponseWriter, r *http.Request)) { http.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { - ctx := appengine.NewContext(r) - cocoon := db.NewCocoon(ctx) - ctx, err := getAuthenticatedContext(ctx, r) + cocoon, err := getAuthenticatedContext(r) if err != nil { serveUnauthorizedAccess(w, err) return } + handler(cocoon, w, r) + }) +} + +// Registers RPC handler that takes JSON and outputs JSON data back. +func registerRPC(path string, handler func(cocoon *db.Cocoon, requestData []byte) (interface{}, error)) { + http.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + cocoon, err := getAuthenticatedContext(r) + + if err != nil { + serveUnauthorizedAccess(w, err) + return + } + + ctx := cocoon.Ctx bytes, err := ioutil.ReadAll(r.Body) if err != nil { serveError(ctx, w, r, err) @@ -99,7 +116,8 @@ func serveUnauthorizedAccess(w http.ResponseWriter, err error) { http.Error(w, fmt.Sprintf("Authentication/authorization error: %v", err), http.StatusUnauthorized) } -func getAuthenticatedContext(ctx context.Context, r *http.Request) (context.Context, error) { +func getAuthenticatedContext(r *http.Request) (*db.Cocoon, error) { + ctx := appengine.NewContext(r) agentAuthToken := r.Header.Get("Agent-Auth-Token") isCron := r.Header.Get("X-Appengine-Cron") == "true" if agentAuthToken != "" { @@ -112,10 +130,10 @@ func getAuthenticatedContext(ctx context.Context, r *http.Request) (context.Cont return nil, fmt.Errorf("Invalid agent: %v", agentID) } - return context.WithValue(ctx, "agent", agent), nil + return db.NewCocoon(context.WithValue(ctx, "agent", agent)), nil } else if isCron { // Authenticate cron requests that are not agents. - return ctx, nil + return db.NewCocoon(ctx), nil } else { // Authenticate as Google account. Note that it could be both a Google // account and agent. @@ -134,16 +152,14 @@ func getAuthenticatedContext(ctx context.Context, r *http.Request) (context.Cont } } - return ctx, nil + return db.NewCocoon(ctx), nil } } func getAuthenticationStatus(w http.ResponseWriter, r *http.Request) { - ctx := appengine.NewContext(r) - // Ignore returned context. This request must succeed in the presence of // errors. - _, err := getAuthenticatedContext(ctx, r) + _, err := getAuthenticatedContext(r) var status string if err == nil { @@ -152,6 +168,7 @@ func getAuthenticationStatus(w http.ResponseWriter, r *http.Request) { status = "Unauthorized" } + ctx := appengine.NewContext(r) loginURL, _ := user.LoginURL(ctx, "/build.html") logoutURL, _ := user.LogoutURL(ctx, "/build.html") diff --git a/app/web/buildStyles.css b/app/web/buildStyles.css index 14211a67d8..4e7682fa57 100644 --- a/app/web/buildStyles.css +++ b/app/web/buildStyles.css @@ -292,21 +292,26 @@ td.stats-value { .task-in-progress { background: linear-gradient(0deg, blue 0%, blue 40%, white 41%, white 59%, blue 60%, blue 100%);; animation: inProgressAnimation 2s infinite linear; + cursor: pointer; } .task-succeeded { background-color: green; + cursor: pointer; } .task-failed { background-color: red; + cursor: pointer; } .task-underperformed { background-color: yellow; + cursor: pointer; } .task-skipped { background-color: gray; } .task-unknown { background-color: #6600cc; + cursor: pointer; } @keyframes inProgressAnimation { diff --git a/commands/append_log.go b/commands/append_log.go new file mode 100644 index 0000000000..a0ca900fa8 --- /dev/null +++ b/commands/append_log.go @@ -0,0 +1,42 @@ +// Copyright (c) 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package commands + +import ( + "cocoon/db" + "fmt" + "io/ioutil" + "net/http" + + "google.golang.org/appengine/datastore" +) + +// AppendLog appends a chunk of log data to the end of the task log file. +func AppendLog(cocoon *db.Cocoon, w http.ResponseWriter, r *http.Request) { + ownerKey, err := datastore.DecodeKey(r.URL.Query().Get("ownerKey")) + + if err != nil { + serveError(cocoon, w, r, err) + return + } + + if !cocoon.EntityExists(ownerKey) { + serveError(cocoon, w, r, fmt.Errorf("Invalid owner key. Owner entity does not exist.")) + return + } + + requestData, err := ioutil.ReadAll(r.Body) + if err != nil { + serveError(cocoon, w, r, err) + return + } + + if err = cocoon.PutLogChunk(ownerKey, requestData); err != nil { + serveError(cocoon, w, r, err) + return + } + + w.Write([]byte("OK")) +} diff --git a/commands/common.go b/commands/common.go new file mode 100644 index 0000000000..3eaf5772ef --- /dev/null +++ b/commands/common.go @@ -0,0 +1,19 @@ +// Copyright (c) 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package commands + +import ( + "cocoon/db" + "fmt" + "net/http" + + "google.golang.org/appengine/log" +) + +func serveError(cocoon *db.Cocoon, w http.ResponseWriter, r *http.Request, err error) { + errorMessage := fmt.Sprintf("[%v] %v", r.URL, err) + log.Errorf(cocoon.Ctx, "%v", errorMessage) + http.Error(w, errorMessage, http.StatusInternalServerError) +} diff --git a/commands/get_log.go b/commands/get_log.go new file mode 100644 index 0000000000..7b6a7632f0 --- /dev/null +++ b/commands/get_log.go @@ -0,0 +1,57 @@ +// Copyright (c) 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package commands + +import ( + "cocoon/db" + "encoding/json" + "fmt" + "net/http" + + "google.golang.org/appengine/datastore" +) + +// GetLog returns the log file as a text file. +func GetLog(cocoon *db.Cocoon, w http.ResponseWriter, r *http.Request) { + ownerKey, err := datastore.DecodeKey(r.URL.Query().Get("ownerKey")) + + if err != nil { + serveError(cocoon, w, r, err) + return + } + + if !cocoon.EntityExists(ownerKey) { + serveError(cocoon, w, r, fmt.Errorf("Invalid owner key. Owner entity does not exist.")) + return + } + + w.Header().Set("content-type", "text/plain") + + te, _ := cocoon.GetTask(ownerKey) + js, _ := json.MarshalIndent(te, "", " ") + w.Write([]byte("\n\n------------ TASK ------------\n")) + w.Write(js) + w.Write([]byte("\n\n------------ LOG ------------\n")) + + query := datastore.NewQuery("LogChunk"). + Filter("OwnerKey =", ownerKey). + Order("-CreateTimestamp"). + Limit(100) + + for iter := query.Run(cocoon.Ctx); ; { + var chunk db.LogChunk + _, err := iter.Next(&chunk) + + if err == datastore.Done { + break + } else if err != nil { + serveError(cocoon, w, r, err) + return + } + + w.Write(chunk.Data) + } + w.Write([]byte("")) +} diff --git a/db/db.go b/db/db.go index 4017018120..ba42a8a5c8 100644 --- a/db/db.go +++ b/db/db.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "sort" + "time" "golang.org/x/crypto/bcrypt" @@ -501,3 +502,15 @@ func (stage *Stage) IsPrimary() bool { name := stage.Name return name == "travis" || name == "chromebot" } + +// PutLogChunk creates a new log chunk record in the datastore. +func (c *Cocoon) PutLogChunk(ownerKey *datastore.Key, data []byte) error { + chunk := &LogChunk{ + OwnerKey: ownerKey, + CreateTimestamp: time.Now().UnixNano() / 1000000, + Data: data, + } + key := datastore.NewIncompleteKey(c.Ctx, "LogChunk", nil) + _, err := datastore.Put(c.Ctx, key, chunk) + return err +} diff --git a/db/schema.go b/db/schema.go index 374a2c1ff3..b1aa84523a 100644 --- a/db/schema.go +++ b/db/schema.go @@ -121,3 +121,17 @@ type Agent struct { type WhitelistedAccount struct { Email string } + +// LogChunk stores a raw chunk of log file indexed by file owner entity and +// timestamp. +type LogChunk struct { + // Points to the entity that owns this log chunk. + OwnerKey *datastore.Key + + // The time the chunk was logged. To get a complete log chunks are sorted + // by this field in descending order. + CreateTimestamp int64 + + // Log data. Must not exceed 1MB (enforced by Datastore). + Data []byte +}