Skip to content

Commit

Permalink
read FeedMessages instead of FeedEntitys 💥
Browse files Browse the repository at this point in the history
see also #1
  • Loading branch information
derhuerst committed Mar 13, 2020
1 parent 1715bfb commit b0377dc
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 36 deletions.
52 changes: 34 additions & 18 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
'use strict'

const {FeedHeader} = require('gtfs-rt-bindings')
const {Writable} = require('stream')
const createEntitiesStore = require('./lib/entities-store')

const {DIFFERENTIAL} = FeedHeader.Incrementality

const tripSignature = (u) => {
if (u.trip.trip_id) return u.trip.trip_id
if (u.trip.route_id && u.vehicle.id) {
Expand Down Expand Up @@ -32,24 +35,37 @@ const gtfsRtAsDump = (opt = {}) => {

const entitiesStore = createEntitiesStore(ttl, timestamp)

const write = (entity) => {
// If the entity is not being deleted, exactly one of 'trip_update', 'vehicle' and 'alert' fields should be populated.
// https://developers.google.com/transit/gtfs-realtime/reference#message-feedentity
let sig = null
if (entity.trip_update) {
sig = tripUpdateSignature(entity.trip_update)
} else if (entity.vehicle) {
sig = vehiclePositionSignature(entity.vehicle)
const write = (msg) => {
if (msg.header.gtfs_realtime_version !== '2.0') {
const err = new Error('FeedMessage GTFS-RT 2.0')
err.feedMessage = msg
throw err
}
if (msg.header.incrementality !== DIFFERENTIAL) {
const err = new Error('FeedMessage must be DIFFERENTIAL')
err.feedMessage = msg
throw err
}
// todo: alert

if (sig !== null) {
entitiesStore.put(sig, entity)
return;
for (const entity of msg.entity) {
// If the entity is not being deleted, exactly one of 'trip_update', 'vehicle' and 'alert' fields should be populated.
// https://developers.google.com/transit/gtfs-realtime/reference#message-feedentity
let sig = null
if (entity.trip_update) {
sig = tripUpdateSignature(entity.trip_update)
} else if (entity.vehicle) {
sig = vehiclePositionSignature(entity.vehicle)
}
// todo: alert, see #1

if (sig !== null) {
entitiesStore.put(sig, entity)
continue
}
const err = new Error('invalid/unsupported kind of FeedEntity')
err.feedEntity = entity
throw err
}
const err = new Error('invalid/unsupported kind of FeedEntity')
err.feedEntity = entity
throw err
}

let feedMessage = null
Expand All @@ -59,12 +75,12 @@ const gtfsRtAsDump = (opt = {}) => {

const out = new Writable({
objectMode: true,
write: (entity, _, cb) => {
write(entity)
write: (feedMsg, _, cb) => {
write(feedMsg)
cb(null)
},
writev: (chunks, cb) => {
for (const {chunk: entity} of chunks) write(entity)
for (const {chunk: feedMsg} of chunks) write(feedMsg)
cb(null)
},
final: (cb) => {
Expand Down
6 changes: 3 additions & 3 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ const toFull = toFullDataset({
})
toFull.on('error')

differentialFeedEntities.pipe(toFull)
differentialFeedMessages.pipe(toFull)
setInterval(() => {
console.log(toFull.asFeedMessage())
}, 5000)
```

`toFull` will be a [writable stream](https://nodejs.org/api/stream.html#stream_class_stream_writable) in [object mode](https://nodejs.org/api/stream.html#stream_object_mode) that expects JS objects in the [`FeedEntity`](https://developers.google.com/transit/gtfs-realtime/reference/#message-feedentity) structure/format.
`toFull` will be a [writable stream](https://nodejs.org/api/stream.html#stream_class_stream_writable) in [object mode](https://nodejs.org/api/stream.html#stream_object_mode) that expects JS objects in the [`FeedMessage`](https://developers.google.com/transit/gtfs-realtime/reference/#message-feedmessage) structure/format.

`toFull.asFeedMessage()` returns a [protocol-buffer-encoded](https://developers.google.com/protocol-buffers/docs/overview) [`FeedMessage`](https://developers.google.com/transit/gtfs-realtime/reference/#message-feedmessage) with all relevant `FeedEntity` that have been written into `toFull` so far.
`toFull.asFeedMessage()` returns a [protocol-buffer-encoded](https://developers.google.com/protocol-buffers/docs/overview) [`FeedMessage`](https://developers.google.com/transit/gtfs-realtime/reference/#message-feedmessage) with all relevant `FeedEntity`s that have been written into `toFull` so far.


## Contributing
Expand Down
7 changes: 3 additions & 4 deletions test/data.ndjson
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{"id":"1","vehicle":{"trip":{"trip_id":"1|64512|1|86|12032020","route_id":"n18"},"vehicle":{"id":null,"label":"U Mohrenstr."},"position":{"latitude":52.5,"longitude":13.3},"stop_id":"900000005205","current_status":2}}
{"id":"2","vehicle":{"trip":{"trip_id":"1|64466|1|86|12032020","route_id":"n6"},"vehicle":{"id":null,"label":"U Alt-Mariendorf"},"position":{"latitude":52.1,"longitude":13.1},"stop_id":"900000012106","current_status":1}}
{"id":"3","trip_update":{"trip":{"trip_id":"1|25445|2|86|12032020","route_id":"n3"},"vehicle":{"id":"40813","label":"U Wittenbergplatz"},"stop_time_update":[{"stop_id":"900000050301","arrival":{"time":null,"delay":null},"departure":{"time":1584060600,"delay":0},"schedule_relationship":0},{"stop_id":"900000023354","arrival":{"time":1584062400,"delay":-60},"departure":{"time":1584062400,"delay":-60},"schedule_relationship":0},{"stop_id":"900000023203","arrival":{"time":1584062460,"delay":-60},"departure":{"time":1584062460,"delay":-60},"schedule_relationship":0},{"stop_id":"900000023204","arrival":{"time":1584062580,"delay":-60},"departure":{"time":1584062580,"delay":-60},"schedule_relationship":0},{"stop_id":"900000056101","arrival":{"time":1584062640,"delay":-60},"departure":{"time":null,"delay":null},"schedule_relationship":0}]}}
{"id":"4","vehicle":{"trip":{"trip_id":"1|64512|1|86|12032020","route_id":"n18"},"vehicle":{"id":null,"label":"U Mohrenstr."},"position":{"latitude":52.6,"longitude":13.4},"stop_id":"900000005205","current_status":2}}
{"header":{"gtfs_realtime_version":"2.0","incrementality":1,"timestamp":1},"entity":[{"id":"1","vehicle":{"trip":{"trip_id":"1|64512|1|86|12032020","route_id":"n18"},"vehicle":{"id":null,"label":"U Mohrenstr."},"position":{"latitude":52.5,"longitude":13.3},"stop_id":"900000005205","current_status":2}},{"id":"2","trip_update":{"trip":{"trip_id":"1|25445|2|86|12032020","route_id":"n3"},"vehicle":{"id":"40813","label":"U Wittenbergplatz"},"stop_time_update":[{"stop_id":"900000050301","arrival":{"time":null,"delay":null},"departure":{"time":1584060600,"delay":0},"schedule_relationship":0},{"stop_id":"900000023354","arrival":{"time":1584062400,"delay":-60},"departure":{"time":1584062400,"delay":-60},"schedule_relationship":0},{"stop_id":"900000023203","arrival":{"time":1584062460,"delay":-60},"departure":{"time":1584062460,"delay":-60},"schedule_relationship":0},{"stop_id":"900000023204","arrival":{"time":1584062580,"delay":-60},"departure":{"time":1584062580,"delay":-60},"schedule_relationship":0},{"stop_id":"900000056101","arrival":{"time":1584062640,"delay":-60},"departure":{"time":null,"delay":null},"schedule_relationship":0}]}}]}
{"header":{"gtfs_realtime_version":"2.0","incrementality":1,"timestamp":2},"entity":[{"id":"3","vehicle":{"trip":{"trip_id":"1|64466|1|86|12032020","route_id":"n6"},"vehicle":{"id":null,"label":"U Alt-Mariendorf"},"position":{"latitude":52.1,"longitude":13.1},"stop_id":"900000012106","current_status":1}}]}
{"header":{"gtfs_realtime_version":"2.0","incrementality":1,"timestamp":3},"entity":[{"id":"4","vehicle":{"trip":{"trip_id":"1|64512|1|86|12032020","route_id":"n18"},"vehicle":{"id":null,"label":"U Mohrenstr."},"position":{"latitude":52.6,"longitude":13.4},"stop_id":"900000005205","current_status":2}}]}
22 changes: 11 additions & 11 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,17 @@ pump(

bufEqual(full.asFeedMessage(), Buffer.from(
`\
0a090a03322e301001180112520a0132224d0a1b0a15317c36343436367c317c38367\
c31323033323032302a026e36421212105520416c742d4d617269656e646f7266120a\
0d66665042159a9951413a0c39303030303030313231303620011289020a01331a830\
20a1b0a15317c32353434357c327c38367c31323033323032302a026e331a1a0a0534\
303831331211552057697474656e62657267706c61747a121c220c393030303030303\
53033303112001a08080010b8b1abf3052800122c220c393030303030303233333534\
120c08c4ffffff0f10c0bfabf3051a0c08c4ffffff0f10c0bfabf3052800122c220c3\
93030303030303233323033120c08c4ffffff0f10fcbfabf3051a0c08c4ffffff0f10\
fcbfabf3052800122c220c393030303030303233323034120c08c4ffffff0f10f4c0a\
bf3051a0c08c4ffffff0f10f4c0abf30528001220220c393030303030303536313031\
120c08c4ffffff0f10b0c1abf3051a002800124f0a0134224a0a1c0a15317c3634353\
0a090a03322e30100118011289020a01321a83020a1b0a15317c32353434357c327c3\
8367c31323033323032302a026e331a1a0a0534303831331211552057697474656e62\
657267706c61747a121c220c39303030303030353033303112001a08080010b8b1abf\
3052800122c220c393030303030303233333534120c08c4ffffff0f10c0bfabf3051a\
0c08c4ffffff0f10c0bfabf3052800122c220c393030303030303233323033120c08c\
4ffffff0f10fcbfabf3051a0c08c4ffffff0f10fcbfabf3052800122c220c39303030\
3030303233323034120c08c4ffffff0f10f4c0abf3051a0c08c4ffffff0f10f4c0abf\
30528001220220c393030303030303536313031120c08c4ffffff0f10b0c1abf3051a\
00280012520a0133224d0a1b0a15317c36343436367c317c38367c313230333230323\
02a026e36421212105520416c742d4d617269656e646f7266120a0d66665042159a99\
51413a0c3930303030303031323130362001124f0a0134224a0a1c0a15317c3634353\
1327c317c38367c31323033323032302a036e3138420e120c55204d6f6872656e7374\
722e120a0d6666524215666656413a0c3930303030303030353230352002`,
'hex'
Expand Down

0 comments on commit b0377dc

Please sign in to comment.