Skip to content
This repository has been archived by the owner on Nov 1, 2024. It is now read-only.

Commit

Permalink
add better error handling when stdin gives an error instead of EOF
Browse files Browse the repository at this point in the history
  • Loading branch information
jakemac53 committed Jan 19, 2017
1 parent 102adef commit 6bbffbc
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 20 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 0.1.2

* Add better handling for the case where stdin gives an error instead of an EOF.

## 0.1.1

* Export `AsyncMessageGrouper` and `SyncMessageGrouper` as part of the testing
Expand Down
31 changes: 19 additions & 12 deletions lib/src/async_message_grouper.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,26 @@ class AsyncMessageGrouper implements MessageGrouper {

/// Returns the next full message that is received, or null if none are left.
Future<List<int>> get next async {
List<int> message;
while (
message == null && (_buffer.isNotEmpty || await _inputQueue.hasNext)) {
if (_buffer.isEmpty) _buffer.addAll(await _inputQueue.next);
var nextByte = _buffer.removeFirst();
if (nextByte == -1) return null;
message = _state.handleInput(nextByte);
try {
List<int> message;
while (message == null &&
(_buffer.isNotEmpty || await _inputQueue.hasNext)) {
if (_buffer.isEmpty) _buffer.addAll(await _inputQueue.next);
var nextByte = _buffer.removeFirst();
if (nextByte == -1) return null;
message = _state.handleInput(nextByte);
}

// If there is nothing left in the queue then cancel the subscription.
if (message == null) _inputQueue.cancel();

return message;
} catch (e) {
// It appears we sometimes get an exception instead of -1 as expected when
// stdin closes, this handles that in the same way (returning a null
// message)
return null;
}

// If there is nothing left in the queue then cancel the subscription.
if (message == null) _inputQueue.cancel();

return message;
}

/// Stop listening to the stream for further updates.
Expand Down
19 changes: 13 additions & 6 deletions lib/src/sync_message_grouper.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@ class SyncMessageGrouper implements MessageGrouper {
///
/// Returns null at end of file.
List<int> get next {
List<int> message;
while (message == null) {
var nextByte = _stdin.readByteSync();
if (nextByte == -1) return null;
message = _state.handleInput(nextByte);
try {
List<int> message;
while (message == null) {
var nextByte = _stdin.readByteSync();
if (nextByte == -1) return null;
message = _state.handleInput(nextByte);
}
return message;
} catch (e) {
// It appears we sometimes get an exception instead of -1 as expected when
// stdin closes, this handles that in the same way (returning a null
// message)
return null;
}
return message;
}
}
1 change: 1 addition & 0 deletions lib/testing.dart
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class TestStdinSync implements TestStdin {
class TestStdinAsync implements TestStdin {
/// Controls the stream for async delivery of bytes.
final StreamController _controller = new StreamController();
StreamController get controller => _controller;

/// Adds all the [bytes] to this stream.
void addInputBytes(List<int> bytes) {
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: bazel_worker
version: 0.1.1
version: 0.1.2
description: Tools for creating a bazel persistent worker.
author: Dart Team <[email protected]>
homepage: https://github.com/dart-lang/bazel_worker
Expand Down
11 changes: 11 additions & 0 deletions test/message_grouper_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,15 @@ void runTests(TestStdin stdinFactory(),
[30, 40]
]);
});

test('Handles the case when stdin gives an error instead of EOF', () async {
if (stdinStream is TestStdinSync) {
// Reading will now cause an error as pendingBytes is empty.
(stdinStream as TestStdinSync).pendingBytes.clear();
expect(messageGrouper.next, isNull);
} else if (stdinStream is TestStdinAsync) {
(stdinStream as TestStdinAsync).controller.addError('Error!');
expect(await messageGrouper.next, isNull);
}
});
}
16 changes: 15 additions & 1 deletion test/worker_loop_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:io';

import 'package:test/test.dart';
Expand Down Expand Up @@ -35,7 +36,7 @@ void runTests/*<T extends TestWorkerConnection>*/(
TestWorkerLoop workerLoopFactory(/*=T*/ connection)) {
TestStdin stdinStream;
TestStdoutStream stdoutStream;
var /*=T*/ connection;
var/*=T*/ connection;
TestWorkerLoop workerLoop;

setUp(() {
Expand Down Expand Up @@ -86,4 +87,17 @@ void runTests/*<T extends TestWorkerConnection>*/(
stdinStream.close();
await workerLoop.run();
});

test('Stops if stdin gives an error instead of EOF', () async {
if (stdinStream is TestStdinSync) {
// Reading will now cause an error as pendingBytes is empty.
(stdinStream as TestStdinSync).pendingBytes.clear();
await workerLoop.run();
} else if (stdinStream is TestStdinAsync) {
var done = new Completer();
workerLoop.run().then((_) => done.complete(null));
(stdinStream as TestStdinAsync).controller.addError('Error!!');
await done.future;
}
});
}

0 comments on commit 6bbffbc

Please sign in to comment.