Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow control for async message consumers #74

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
build:
docker:
# specify the version you desire here
- image: circleci/node:8.11
- image: circleci/node:10.0.0

working_directory: ~/repo

Expand Down
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,31 @@ async function logMessagesFromFooBar() {
logMessagesFromFooBar();
```

If you're dealing with large rosbag files and need flow control to process messages async, use the Async Generator `iterateMessages`.

```js
const { open } = require('rosbag');

async function readWithAsyncConsumer () {
const options = { topics: ['/unicorns'] };
const bag = await open("filename.bag");
for await (const msg of bag.iterateMessages(options)) {
// each message is read separately
// respecting async behavior in the for loop
await doSomethingAsync(msg);
}
}

readWithAsyncConsumer()
.then(() => console.log("DONE"));

```

That way you can also create a readable message stream
```js
require("stream").Readable.from(bag.iterateMessages(options))
```

## API

### Opening a new rosbag reader
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "rosbag",
"version": "2.6.1",
"version": "2.6.3",
"license": "Apache-2.0",
"repository": "cruise-automation/rosbag.js",
"dependencies": {
Expand All @@ -9,7 +9,7 @@
"int53": "1.0.0"
},
"engines": {
"node": ">=8.0.0"
"node": ">=10.0.0"
},
"main": "dist/node",
"browser": "dist/web",
Expand Down
10 changes: 9 additions & 1 deletion src/bag.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ export default class Bag {
}

async readMessages(opts: ReadOptions, callback: (msg: ReadResult<any>) => void) {
for await (const msg of this.iterateMessages(opts)) {
callback(msg);
}
}

async *iterateMessages(opts: ReadOptions): AsyncGenerator<ReadResult<any>, void, number> {
const connections = this.connections;

const startTime = opts.startTime || { sec: 0, nsec: 0 };
Expand Down Expand Up @@ -121,7 +127,9 @@ export default class Bag {
endTime,
decompress
);
messages.forEach((msg) => callback(parseMsg(msg, i)));
for (const msg of messages) {
if (yield parseMsg(msg, i)) return;
}
}
}
}
17 changes: 17 additions & 0 deletions src/bag.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,23 @@ describe("rosbag - high-level api", () => {
expect(messages).toHaveLength(9);
});

it("reads at consumer speed and abort reading on demand", async () => {
const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
const opts = { topics: ["/tf"] };
const bag = await Bag.open(getFixture());
const messages = bag.iterateMessages(opts || {});
const r1 = await messages.next();
expect(r1.value.timestamp.nsec).toBe(56251251);
expect(r1.done).toBe(false);
await delay(100);
const r2 = await messages.next();
expect(r2.value.timestamp.nsec).toBe(56262848);
expect(r2.done).toBe(false);
await delay(100);
const r3 = await messages.next(true);
expect(r3.done).toBe(true);
});

describe("compression", () => {
it("throws if compression scheme is not registered", async () => {
let errorThrown = false;
Expand Down
5 changes: 4 additions & 1 deletion src/fields.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type { Time } from "./types";
// reads through a buffer and extracts { [key: string]: value: string }
// pairs - the buffer is expected to have length prefixed utf8 strings
// with a '=' separating the key and value
const EQUALS_CHARCODE = "=".charCodeAt(0);
export function extractFields(buffer: Buffer) {
if (buffer.length < 4) {
throw new Error("Header fields are truncated.");
Expand All @@ -27,8 +28,10 @@ export function extractFields(buffer: Buffer) {
throw new Error("Header fields are corrupt.");
}

// Passing a number into "indexOf" explicitly to avoid Buffer polyfill
// slow path. See issue #87.
const field = buffer.slice(i, i + length);
const index = field.indexOf("=");
const index = field.indexOf(EQUALS_CHARCODE);
if (index === -1) {
throw new Error("Header field is missing equals sign.");
}
Expand Down
32 changes: 13 additions & 19 deletions src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,19 @@ export interface Filelike {
size(): number;
}

export type RosMsgField =
| {|
type: string,
name: string,
isConstant?: boolean,
isComplex?: boolean,
value?: mixed,
isArray?: false,
arrayLength?: void,
|}
| {|
type: string,
name: string,
isConstant?: boolean,
isComplex?: boolean,
value?: mixed,
isArray: true,
arrayLength: ?number,
|};
export type RosMsgField = {|
type: string,
name: string,
isComplex?: boolean,

// For arrays
isArray?: boolean,
arrayLength?: ?number,

// For constants
isConstant?: boolean,
value?: mixed,
|};

export type RosMsgDefinition = {|
name?: string,
Expand Down