diff --git a/agent/lib/src/adb.dart b/agent/lib/src/adb.dart index 045020751d..308c7af3ba 100644 --- a/agent/lib/src/adb.dart +++ b/agent/lib/src/adb.dart @@ -47,7 +47,7 @@ class Adb { static final RegExp _kDeviceRegex = new RegExp(r'^(\S+)\s+(\S+)(.*)'); static Future> get deviceIds async { - List output = (await eval(config.adbPath, ['devices', '-l'], canFail: false, onKill: _adbTimeout())) + List output = (await eval(config.adbPath, ['devices', '-l'], canFail: false)) .trim().split('\n'); List results = []; for (String line in output) { @@ -122,13 +122,11 @@ class Adb { /// Executes [command] on `adb shell` and returns its exit code. Future shellExec(String command, List arguments, {Map env}) async { - await exec(config.adbPath, ['shell', command]..addAll(arguments), env: env, canFail: false, onKill: _adbTimeout()); + await exec(config.adbPath, ['shell', command]..addAll(arguments), env: env, canFail: false); } /// Executes [command] on `adb shell` and returns its standard output as a [String]. Future shellEval(String command, List arguments, {Map env}) { - return eval(config.adbPath, ['shell', command]..addAll(arguments), env: env, canFail: false, onKill: _adbTimeout()); + return eval(config.adbPath, ['shell', command]..addAll(arguments), env: env, canFail: false); } - - static Future _adbTimeout() => new Future.delayed(const Duration(seconds: 5)); } diff --git a/agent/lib/src/analysis.dart b/agent/lib/src/analysis.dart index 6b7431403c..f228010ad2 100644 --- a/agent/lib/src/analysis.dart +++ b/agent/lib/src/analysis.dart @@ -43,19 +43,19 @@ abstract class AnalyzerTask extends Task { class AnalyzerCliTask extends AnalyzerTask { AnalyzerCliTask(String sdk, String commit, DateTime timestamp) : super('analyzer_cli__analysis_time') { - this.benchmark = new FlutterAnalyzeBenchmark(onCancel, sdk, commit, timestamp); + this.benchmark = new FlutterAnalyzeBenchmark(sdk, commit, timestamp); } } class AnalyzerServerTask extends AnalyzerTask { AnalyzerServerTask(String sdk, String commit, DateTime timestamp) : super('analyzer_server__analysis_time') { - this.benchmark = new FlutterAnalyzeAppBenchmark(onCancel, sdk, commit, timestamp); + this.benchmark = new FlutterAnalyzeAppBenchmark(sdk, commit, timestamp); } } class FlutterAnalyzeBenchmark extends Benchmark { - FlutterAnalyzeBenchmark(Future onCancel, this.sdk, this.commit, this.timestamp) - : super('flutter analyze --flutter-repo', onCancel); + FlutterAnalyzeBenchmark(this.sdk, this.commit, this.timestamp) + : super('flutter analyze --flutter-repo'); final String sdk; final String commit; @@ -70,15 +70,15 @@ class FlutterAnalyzeBenchmark extends Benchmark { Future run() async { rm(benchmarkFile); await inDirectory(config.flutterDirectory, () async { - await flutter('analyze', onCancel, options: ['--flutter-repo', '--benchmark']); + await flutter('analyze', options: ['--flutter-repo', '--benchmark']); }); return addBuildInfo(benchmarkFile, timestamp: timestamp, expected: 25.0, sdk: sdk, commit: commit); } } class FlutterAnalyzeAppBenchmark extends Benchmark { - FlutterAnalyzeAppBenchmark(Future onCancel, this.sdk, this.commit, this.timestamp) - : super('analysis server mega_gallery', onCancel); + FlutterAnalyzeAppBenchmark(this.sdk, this.commit, this.timestamp) + : super('analysis server mega_gallery'); final String sdk; final String commit; @@ -92,7 +92,7 @@ class FlutterAnalyzeAppBenchmark extends Benchmark { Future init() { return inDirectory(config.flutterDirectory, () async { - await dart(['dev/tools/mega_gallery.dart'], onCancel); + await dart(['dev/tools/mega_gallery.dart']); }); } @@ -100,7 +100,7 @@ class FlutterAnalyzeAppBenchmark extends Benchmark { Future run() async { rm(benchmarkFile); await inDirectory(megaDir, () async { - await flutter('analyze', onCancel, options: ['--watch', '--benchmark']); + await flutter('analyze', options: ['--watch', '--benchmark']); }); return addBuildInfo(benchmarkFile, timestamp: timestamp, expected: 10.0, sdk: sdk, commit: commit); } diff --git a/agent/lib/src/benchmarks.dart b/agent/lib/src/benchmarks.dart index 90900a32df..5877218633 100644 --- a/agent/lib/src/benchmarks.dart +++ b/agent/lib/src/benchmarks.dart @@ -7,10 +7,9 @@ import 'dart:async'; import 'framework.dart'; abstract class Benchmark { - Benchmark(this.name, this.onCancel); + Benchmark(this.name); final String name; - final Future onCancel; TaskResultData bestResult; diff --git a/agent/lib/src/commands/ci.dart b/agent/lib/src/commands/ci.dart index 0d63484a7c..5c9be096f7 100644 --- a/agent/lib/src/commands/ci.dart +++ b/agent/lib/src/commands/ci.dart @@ -11,17 +11,13 @@ import 'package:args/args.dart'; import '../adb.dart'; import '../agent.dart'; import '../firebase.dart'; +import '../framework.dart'; import '../utils.dart'; /// Agents periodically poll the server for more tasks. This sleep period is /// used to prevent us from DDoS-ing the server. const Duration _sleepBetweenBuilds = const Duration(seconds: 10); -/// Maximum amount of time a single task is allowed to take. -/// -/// After that the task is killed and reported as failed. -const Duration _taskTimeout = const Duration(minutes: 10); - /// Runs the agent in continuous integration mode. /// /// In this mode the agent runs in an infinite loop, continuously asking for @@ -44,7 +40,7 @@ class ContinuousIntegrationCommand extends Command { if (task != null) { // Sync flutter outside of the task so it does not contribute to // the task timeout. - await getFlutterAt(task.revision); + await getFlutterAt(task.revision).timeout(const Duration(minutes: 10)); // No need to pass revision as repo syncing is done here. List runnerArgs = [ @@ -55,12 +51,11 @@ class ContinuousIntegrationCommand extends Command { Process proc = await startProcess( dartBin, - [config.runTaskFile.path]..addAll(runnerArgs), - onKill: new Future.delayed(_taskTimeout) - ); + [config.runTaskFile.path]..addAll(runnerArgs) + ).timeout(const Duration(minutes: 1)); _logStandardStreams(task, proc); - await proc.exitCode; + await proc.exitCode.timeout(taskTimeoutWithGracePeriod); } } catch(error, stackTrace) { print('ERROR: $error\n$stackTrace'); @@ -68,6 +63,8 @@ class ContinuousIntegrationCommand extends Command { } } catch(error, stackTrace) { print('ERROR: $error\n$stackTrace'); + } finally { + await forceQuitRunningProcesses(); } // TODO(yjbanov): report health status after running the task diff --git a/agent/lib/src/commands/run.dart b/agent/lib/src/commands/run.dart index 7ef44a9d90..969fe01bbd 100644 --- a/agent/lib/src/commands/run.dart +++ b/agent/lib/src/commands/run.dart @@ -68,13 +68,15 @@ class RunCommand extends Command { await agent.updateTaskStatus(task.key, 'Failed'); } } - }); + }).timeout(taskTimeout); } catch(error, chain) { // TODO(yjbanov): upload logs print('Caught: $error\n${(chain as Chain).terse}'); if (task.key != null) await agent.updateTaskStatus(task.key, 'Failed'); - exit(1); + exitCode = 1; + } finally { + await forceQuitRunningProcesses(); } } } diff --git a/agent/lib/src/framework.dart b/agent/lib/src/framework.dart index 497823a3ef..ff18ba235d 100644 --- a/agent/lib/src/framework.dart +++ b/agent/lib/src/framework.dart @@ -11,10 +11,11 @@ import 'utils.dart'; /// Maximum amount of time a single task is allowed to take to run. /// /// If exceeded the task is considered to have failed. -const Duration taskTimeout = const Duration(minutes: 7); +const Duration taskTimeout = const Duration(minutes: 10); -/// Maximum amount of time cancelling a task is allowed to take. -const Duration cancelTimeout = const Duration(minutes: 1); +/// Slightly longer than [taskTimeout] that gives the task runner a chance to +/// clean-up before forcefully quitting it. +const Duration taskTimeoutWithGracePeriod = const Duration(minutes: 11); /// Represents a unit of work performed on the dashboard build box that can /// succeed, fail and be retried independently of others. @@ -26,23 +27,8 @@ abstract class Task { /// This should be as unique as possible to avoid confusion. final String name; - final Completer _cancelCompleter = new Completer(); - - /// Signals that the task must be cancelled immediately. - /// - /// Task implementations must obey this signal by killing pending processes - /// and reclaiming subscriptions to streams, such as network requests. - /// - /// The signal may be sent any time whether the task is running or - /// not and must be robust enough to handle it without throwing. - Future get onCancel => _cancelCompleter.future; - /// Performs actual work. Future run(); - - void cancel() { - _cancelCompleter.complete(); - } } /// Runs a queue of tasks; collects results. @@ -65,7 +51,7 @@ class TaskRunner { section('Running task ${task.name}'); TaskResult result; try { - TaskResultData data = await task.run().timeout(taskTimeout); + TaskResultData data = await task.run(); if (data != null) result = new TaskResult.success(task, revision, data); else @@ -75,14 +61,6 @@ class TaskRunner { if (taskErrorStack != null) { message += '\n\n$taskErrorStack'; } - try { - task.cancel(); - } catch (cancelError, cancelErrorStack) { - message += '\n\nAttempted to cancel tasks, but failed due to: $cancelError'; - if (cancelErrorStack != null) { - message += '\n\n$cancelErrorStack'; - } - } print(''); print(message); result = new TaskResult.failure(task, revision, message); diff --git a/agent/lib/src/gallery.dart b/agent/lib/src/gallery.dart index 447aded40f..4895a07155 100644 --- a/agent/lib/src/gallery.dart +++ b/agent/lib/src/gallery.dart @@ -21,8 +21,8 @@ class GalleryTransitionTest extends Task { device.unlock(); Directory galleryDirectory = dir('${config.flutterDirectory.path}/examples/flutter_gallery'); await inDirectory(galleryDirectory, () async { - await pub('get', onCancel); - await flutter('drive', onCancel, options: [ + await pub('get'); + await flutter('drive', options: [ '--profile', '--trace-startup', '-t', diff --git a/agent/lib/src/perf_tests.dart b/agent/lib/src/perf_tests.dart index b00aa0e0eb..4c60f44ae4 100644 --- a/agent/lib/src/perf_tests.dart +++ b/agent/lib/src/perf_tests.dart @@ -45,8 +45,8 @@ class StartupTest extends Task { return await inDirectory(testDirectory, () async { Adb device = await adb(); device.unlock(); - await pub('get', onCancel); - await flutter('run', onCancel, options: [ + await pub('get'); + await flutter('run', options: [ '--profile', '--trace-startup', '-d', @@ -77,8 +77,8 @@ class PerfTest extends Task { return inDirectory(testDirectory, () async { Adb device = await adb(); device.unlock(); - await pub('get', onCancel); - await flutter('drive', onCancel, options: [ + await pub('get'); + await flutter('drive', options: [ '--profile', '--trace-startup', // Enables "endless" timeline event buffering. '-t', @@ -107,10 +107,10 @@ class BuildTest extends Task { return await inDirectory(testDirectory, () async { Adb device = await adb(); device.unlock(); - await pub('get', onCancel); + await pub('get'); var watch = new Stopwatch()..start(); - await flutter('build', onCancel, options: [ + await flutter('build', options: [ 'aot', '--profile', '--no-pub', diff --git a/agent/lib/src/refresh.dart b/agent/lib/src/refresh.dart index 056945b913..023b704d68 100644 --- a/agent/lib/src/refresh.dart +++ b/agent/lib/src/refresh.dart @@ -31,7 +31,7 @@ class EditRefreshTask extends Task { Future run() async { Adb device = await adb(); device.unlock(); - Benchmark benchmark = new EditRefreshBenchmark(commit, timestamp, onCancel); + Benchmark benchmark = new EditRefreshBenchmark(commit, timestamp); section(benchmark.name); await runBenchmark(benchmark, iterations: 3, warmUpBenchmark: true); return benchmark.bestResult; @@ -39,8 +39,8 @@ class EditRefreshTask extends Task { } class EditRefreshBenchmark extends Benchmark { - EditRefreshBenchmark(this.commit, this.timestamp, Future onCancel) - : super('edit refresh', onCancel); + EditRefreshBenchmark(this.commit, this.timestamp) + : super('edit refresh'); final String commit; final DateTime timestamp; @@ -53,7 +53,7 @@ class EditRefreshBenchmark extends Benchmark { Future init() { return inDirectory(config.flutterDirectory, () async { - await dart(['dev/tools/mega_gallery.dart'], onCancel); + await dart(['dev/tools/mega_gallery.dart']); }); } @@ -63,7 +63,7 @@ class EditRefreshBenchmark extends Benchmark { rm(benchmarkFile); int exitCode = await inDirectory(megaDir, () async { return await flutter( - 'run', onCancel, options: ['-d', device.deviceId, '--resident', '--benchmark'], canFail: true + 'run', options: ['-d', device.deviceId, '--resident', '--benchmark'], canFail: true ); }); if (exitCode != 0) diff --git a/agent/lib/src/size_tests.dart b/agent/lib/src/size_tests.dart index e398a30c67..a6beee88b7 100644 --- a/agent/lib/src/size_tests.dart +++ b/agent/lib/src/size_tests.dart @@ -24,15 +24,15 @@ class BasicMaterialAppSizeTest extends Task { int apkSizeInBytes; await inDirectory(Directory.systemTemp, () async { - await flutter('create', onCancel, options: [sampleAppName]); + await flutter('create', options: [sampleAppName]); if (!(await sampleDir.exists())) throw 'Failed to create sample Flutter app in ${sampleDir.path}'; await inDirectory(sampleDir, () async { - await pub('get', onCancel); - await flutter('build', onCancel, options: ['clean']); - await flutter('build', onCancel, options: ['apk', '--release']); + await pub('get'); + await flutter('build', options: ['clean']); + await flutter('build', options: ['apk', '--release']); apkSizeInBytes = await file('${sampleDir.path}/build/app.apk').length(); }); }); diff --git a/agent/lib/src/utils.dart b/agent/lib/src/utils.dart index 0cb96f4121..de2f20a033 100644 --- a/agent/lib/src/utils.dart +++ b/agent/lib/src/utils.dart @@ -18,6 +18,26 @@ String cwd = Directory.current.path; Config _config; Config get config => _config; +List _runningProcesses = []; + +class ProcessInfo { + ProcessInfo(this.command, this.process); + + final DateTime startTime = new DateTime.now(); + final String command; + final Process process; + + @override + String toString() { + return +''' + command : $command + started : $startTime + pid : ${process.pid} +'''.trim(); + } +} + class BuildFailedError extends Error { BuildFailedError(this.message); @@ -111,33 +131,38 @@ Future getFlutterRepoCommitTimestamp(String commit) { } Future startProcess(String executable, List arguments, - {Map env, Future onKill}) async { - print('Executing: $executable ${arguments?.join(' ') ?? ""}'); + {Map env}) async { + String command = '$executable ${arguments?.join(" ") ?? ""}'; + print('Executing: $command'); Process proc = await Process.start(executable, arguments, environment: env, workingDirectory: cwd); + ProcessInfo procInfo = new ProcessInfo(command, proc); + _runningProcesses.add(procInfo); - if (onKill != null) { - bool processExited = false; + proc.exitCode.then((_) { + _runningProcesses.remove(procInfo); + }); - proc.exitCode.then((_) { - processExited = true; - }); + return proc; +} - onKill.then((_) { - if (!processExited) { - print('Caught signal to kill process (PID: ${proc.pid}): $executable ${arguments.join(' ')}'); - bool killed = proc.kill(ProcessSignal.SIGKILL); - print('Process ${killed ? "was killed successfully" : "could not be killed"}.'); - } - }); - } +Future forceQuitRunningProcesses() async { + // Give normally quitting processes a chance to report their exit code. + await new Future.delayed(const Duration(seconds: 1)); - return proc; + // Whatever's left, kill it. + for (ProcessInfo p in _runningProcesses) { + print('Force quitting process:\n$p'); + if (!p.process.kill()) { + print('Failed to force quit process'); + } + } + _runningProcesses.clear(); } /// Executes a command and returns its exit code. Future exec(String executable, List arguments, - {Map env, bool canFail: false, Future onKill}) async { - Process proc = await startProcess(executable, arguments, env: env, onKill: onKill); + {Map env, bool canFail: false}) async { + Process proc = await startProcess(executable, arguments, env: env); proc.stdout .transform(UTF8.decoder) @@ -160,8 +185,8 @@ Future exec(String executable, List arguments, /// /// Standard error is redirected to the current process' standard error stream. Future eval(String executable, List arguments, - {Map env, bool canFail: false, Future onKill}) async { - Process proc = await startProcess(executable, arguments, env: env, onKill: onKill); + {Map env, bool canFail: false}) async { + Process proc = await startProcess(executable, arguments, env: env); proc.stderr.listen((List data) { stderr.write(data); }); @@ -174,25 +199,20 @@ Future eval(String executable, List arguments, return output.trimRight(); } -Future flutter(String command, Future onKill, {List options: const[], bool canFail: false}) { - if (onKill == null) { - throw 'flutter command must obey onKill signal'; - } - +Future flutter(String command, {List options: const[], bool canFail: false}) { List args = [command] ..addAll(options); - return exec(path.join(config.flutterDirectory.path, 'bin', 'flutter'), args, canFail: canFail, onKill: onKill); + return exec(path.join(config.flutterDirectory.path, 'bin', 'flutter'), args, canFail: canFail); } String get dartBin => path.join(config.flutterDirectory.path, 'bin/cache/dart-sdk/bin/dart'); -Future dart(List args, Future onKill) => exec(dartBin, args, onKill: onKill); +Future dart(List args) => exec(dartBin, args); -Future pub(String command, Future onKill) { +Future pub(String command) { return exec( path.join(config.flutterDirectory.path, 'bin/cache/dart-sdk/bin/pub'), - [command], - onKill: onKill + [command] ); } @@ -337,23 +357,21 @@ Future getFlutter(String revision) async { rrm(config.flutterDirectory); } - Future timeout = new Future.delayed(const Duration(minutes: 10)); - await inDirectory(config.flutterDirectory.parent, () async { - await exec('git', ['clone', 'https://github.com/flutter/flutter.git'], onKill: timeout); + await exec('git', ['clone', 'https://github.com/flutter/flutter.git']); }); await inDirectory(config.flutterDirectory, () async { - await exec('git', ['checkout', revision], onKill: timeout); + await exec('git', ['checkout', revision]); }); - await flutter('config', timeout, options: ['--no-analytics']); + await flutter('config', options: ['--no-analytics']); section('flutter doctor'); - await flutter('doctor', timeout); + await flutter('doctor'); section('flutter update-packages'); - await flutter('update-packages', timeout); + await flutter('update-packages'); return true; }