From 6bbffbcfe14f77070f09e96ed6b49922c1dfd48d Mon Sep 17 00:00:00 2001 From: Jacob MacDonald Date: Wed, 18 Jan 2017 09:55:27 -0800 Subject: [PATCH] add better error handling when stdin gives an error instead of EOF --- CHANGELOG.md | 4 ++++ lib/src/async_message_grouper.dart | 31 ++++++++++++++++++------------ lib/src/sync_message_grouper.dart | 19 ++++++++++++------ lib/testing.dart | 1 + pubspec.yaml | 2 +- test/message_grouper_test.dart | 11 +++++++++++ test/worker_loop_test.dart | 16 ++++++++++++++- 7 files changed, 64 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dd62957..d81018c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.1.2 + +* Add better handling for the case where stdin gives an error instead of an EOF. + ## 0.1.1 * Export `AsyncMessageGrouper` and `SyncMessageGrouper` as part of the testing diff --git a/lib/src/async_message_grouper.dart b/lib/src/async_message_grouper.dart index 448f51e..93caf49 100644 --- a/lib/src/async_message_grouper.dart +++ b/lib/src/async_message_grouper.dart @@ -27,19 +27,26 @@ class AsyncMessageGrouper implements MessageGrouper { /// Returns the next full message that is received, or null if none are left. Future> get next async { - List message; - while ( - message == null && (_buffer.isNotEmpty || await _inputQueue.hasNext)) { - if (_buffer.isEmpty) _buffer.addAll(await _inputQueue.next); - var nextByte = _buffer.removeFirst(); - if (nextByte == -1) return null; - message = _state.handleInput(nextByte); + try { + List message; + while (message == null && + (_buffer.isNotEmpty || await _inputQueue.hasNext)) { + if (_buffer.isEmpty) _buffer.addAll(await _inputQueue.next); + var nextByte = _buffer.removeFirst(); + if (nextByte == -1) return null; + message = _state.handleInput(nextByte); + } + + // If there is nothing left in the queue then cancel the subscription. + if (message == null) _inputQueue.cancel(); + + return message; + } catch (e) { + // It appears we sometimes get an exception instead of -1 as expected when + // stdin closes, this handles that in the same way (returning a null + // message) + return null; } - - // If there is nothing left in the queue then cancel the subscription. - if (message == null) _inputQueue.cancel(); - - return message; } /// Stop listening to the stream for further updates. diff --git a/lib/src/sync_message_grouper.dart b/lib/src/sync_message_grouper.dart index 50d5dca..20a1139 100644 --- a/lib/src/sync_message_grouper.dart +++ b/lib/src/sync_message_grouper.dart @@ -18,12 +18,19 @@ class SyncMessageGrouper implements MessageGrouper { /// /// Returns null at end of file. List get next { - List message; - while (message == null) { - var nextByte = _stdin.readByteSync(); - if (nextByte == -1) return null; - message = _state.handleInput(nextByte); + try { + List message; + while (message == null) { + var nextByte = _stdin.readByteSync(); + if (nextByte == -1) return null; + message = _state.handleInput(nextByte); + } + return message; + } catch (e) { + // It appears we sometimes get an exception instead of -1 as expected when + // stdin closes, this handles that in the same way (returning a null + // message) + return null; } - return message; } } diff --git a/lib/testing.dart b/lib/testing.dart index ecc3142..882097f 100644 --- a/lib/testing.dart +++ b/lib/testing.dart @@ -51,6 +51,7 @@ class TestStdinSync implements TestStdin { class TestStdinAsync implements TestStdin { /// Controls the stream for async delivery of bytes. final StreamController _controller = new StreamController(); + StreamController get controller => _controller; /// Adds all the [bytes] to this stream. void addInputBytes(List bytes) { diff --git a/pubspec.yaml b/pubspec.yaml index 241861f..e7e589e 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,5 +1,5 @@ name: bazel_worker -version: 0.1.1 +version: 0.1.2 description: Tools for creating a bazel persistent worker. author: Dart Team homepage: https://github.com/dart-lang/bazel_worker diff --git a/test/message_grouper_test.dart b/test/message_grouper_test.dart index 8a975fb..39dcb9e 100644 --- a/test/message_grouper_test.dart +++ b/test/message_grouper_test.dart @@ -137,4 +137,15 @@ void runTests(TestStdin stdinFactory(), [30, 40] ]); }); + + test('Handles the case when stdin gives an error instead of EOF', () async { + if (stdinStream is TestStdinSync) { + // Reading will now cause an error as pendingBytes is empty. + (stdinStream as TestStdinSync).pendingBytes.clear(); + expect(messageGrouper.next, isNull); + } else if (stdinStream is TestStdinAsync) { + (stdinStream as TestStdinAsync).controller.addError('Error!'); + expect(await messageGrouper.next, isNull); + } + }); } diff --git a/test/worker_loop_test.dart b/test/worker_loop_test.dart index 1c27d5d..ac4dd11 100644 --- a/test/worker_loop_test.dart +++ b/test/worker_loop_test.dart @@ -2,6 +2,7 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. +import 'dart:async'; import 'dart:io'; import 'package:test/test.dart'; @@ -35,7 +36,7 @@ void runTests/**/( TestWorkerLoop workerLoopFactory(/*=T*/ connection)) { TestStdin stdinStream; TestStdoutStream stdoutStream; - var /*=T*/ connection; + var/*=T*/ connection; TestWorkerLoop workerLoop; setUp(() { @@ -86,4 +87,17 @@ void runTests/**/( stdinStream.close(); await workerLoop.run(); }); + + test('Stops if stdin gives an error instead of EOF', () async { + if (stdinStream is TestStdinSync) { + // Reading will now cause an error as pendingBytes is empty. + (stdinStream as TestStdinSync).pendingBytes.clear(); + await workerLoop.run(); + } else if (stdinStream is TestStdinAsync) { + var done = new Completer(); + workerLoop.run().then((_) => done.complete(null)); + (stdinStream as TestStdinAsync).controller.addError('Error!!'); + await done.future; + } + }); }