From b0377dc577997cb119eab1b58410666e18d74f4c Mon Sep 17 00:00:00 2001 From: Jannis R Date: Fri, 13 Mar 2020 11:38:33 +0100 Subject: [PATCH] read FeedMessages instead of FeedEntitys :boom: see also #1 --- index.js | 52 +++++++++++++++++++++++++++++++----------------- readme.md | 6 +++--- test/data.ndjson | 7 +++---- test/index.js | 22 ++++++++++---------- 4 files changed, 51 insertions(+), 36 deletions(-) diff --git a/index.js b/index.js index a9e7372..3525c0d 100644 --- a/index.js +++ b/index.js @@ -1,8 +1,11 @@ 'use strict' +const {FeedHeader} = require('gtfs-rt-bindings') const {Writable} = require('stream') const createEntitiesStore = require('./lib/entities-store') +const {DIFFERENTIAL} = FeedHeader.Incrementality + const tripSignature = (u) => { if (u.trip.trip_id) return u.trip.trip_id if (u.trip.route_id && u.vehicle.id) { @@ -32,24 +35,37 @@ const gtfsRtAsDump = (opt = {}) => { const entitiesStore = createEntitiesStore(ttl, timestamp) - const write = (entity) => { - // If the entity is not being deleted, exactly one of 'trip_update', 'vehicle' and 'alert' fields should be populated. - // https://developers.google.com/transit/gtfs-realtime/reference#message-feedentity - let sig = null - if (entity.trip_update) { - sig = tripUpdateSignature(entity.trip_update) - } else if (entity.vehicle) { - sig = vehiclePositionSignature(entity.vehicle) + const write = (msg) => { + if (msg.header.gtfs_realtime_version !== '2.0') { + const err = new Error('FeedMessage GTFS-RT 2.0') + err.feedMessage = msg + throw err + } + if (msg.header.incrementality !== DIFFERENTIAL) { + const err = new Error('FeedMessage must be DIFFERENTIAL') + err.feedMessage = msg + throw err } - // todo: alert - if (sig !== null) { - entitiesStore.put(sig, entity) - return; + for (const entity of msg.entity) { + // If the entity is not being deleted, exactly one of 'trip_update', 'vehicle' and 'alert' fields should be populated. + // https://developers.google.com/transit/gtfs-realtime/reference#message-feedentity + let sig = null + if (entity.trip_update) { + sig = tripUpdateSignature(entity.trip_update) + } else if (entity.vehicle) { + sig = vehiclePositionSignature(entity.vehicle) + } + // todo: alert, see #1 + + if (sig !== null) { + entitiesStore.put(sig, entity) + continue + } + const err = new Error('invalid/unsupported kind of FeedEntity') + err.feedEntity = entity + throw err } - const err = new Error('invalid/unsupported kind of FeedEntity') - err.feedEntity = entity - throw err } let feedMessage = null @@ -59,12 +75,12 @@ const gtfsRtAsDump = (opt = {}) => { const out = new Writable({ objectMode: true, - write: (entity, _, cb) => { - write(entity) + write: (feedMsg, _, cb) => { + write(feedMsg) cb(null) }, writev: (chunks, cb) => { - for (const {chunk: entity} of chunks) write(entity) + for (const {chunk: feedMsg} of chunks) write(feedMsg) cb(null) }, final: (cb) => { diff --git a/readme.md b/readme.md index 1d66dcc..7fb10cc 100644 --- a/readme.md +++ b/readme.md @@ -28,15 +28,15 @@ const toFull = toFullDataset({ }) toFull.on('error') -differentialFeedEntities.pipe(toFull) +differentialFeedMessages.pipe(toFull) setInterval(() => { console.log(toFull.asFeedMessage()) }, 5000) ``` -`toFull` will be a [writable stream](https://nodejs.org/api/stream.html#stream_class_stream_writable) in [object mode](https://nodejs.org/api/stream.html#stream_object_mode) that expects JS objects in the [`FeedEntity`](https://developers.google.com/transit/gtfs-realtime/reference/#message-feedentity) structure/format. +`toFull` will be a [writable stream](https://nodejs.org/api/stream.html#stream_class_stream_writable) in [object mode](https://nodejs.org/api/stream.html#stream_object_mode) that expects JS objects in the [`FeedMessage`](https://developers.google.com/transit/gtfs-realtime/reference/#message-feedmessage) structure/format. -`toFull.asFeedMessage()` returns a [protocol-buffer-encoded](https://developers.google.com/protocol-buffers/docs/overview) [`FeedMessage`](https://developers.google.com/transit/gtfs-realtime/reference/#message-feedmessage) with all relevant `FeedEntity` that have been written into `toFull` so far. +`toFull.asFeedMessage()` returns a [protocol-buffer-encoded](https://developers.google.com/protocol-buffers/docs/overview) [`FeedMessage`](https://developers.google.com/transit/gtfs-realtime/reference/#message-feedmessage) with all relevant `FeedEntity`s that have been written into `toFull` so far. ## Contributing diff --git a/test/data.ndjson b/test/data.ndjson index 7764b59..1146de0 100644 --- a/test/data.ndjson +++ b/test/data.ndjson @@ -1,4 +1,3 @@ -{"id":"1","vehicle":{"trip":{"trip_id":"1|64512|1|86|12032020","route_id":"n18"},"vehicle":{"id":null,"label":"U Mohrenstr."},"position":{"latitude":52.5,"longitude":13.3},"stop_id":"900000005205","current_status":2}} -{"id":"2","vehicle":{"trip":{"trip_id":"1|64466|1|86|12032020","route_id":"n6"},"vehicle":{"id":null,"label":"U Alt-Mariendorf"},"position":{"latitude":52.1,"longitude":13.1},"stop_id":"900000012106","current_status":1}} -{"id":"3","trip_update":{"trip":{"trip_id":"1|25445|2|86|12032020","route_id":"n3"},"vehicle":{"id":"40813","label":"U Wittenbergplatz"},"stop_time_update":[{"stop_id":"900000050301","arrival":{"time":null,"delay":null},"departure":{"time":1584060600,"delay":0},"schedule_relationship":0},{"stop_id":"900000023354","arrival":{"time":1584062400,"delay":-60},"departure":{"time":1584062400,"delay":-60},"schedule_relationship":0},{"stop_id":"900000023203","arrival":{"time":1584062460,"delay":-60},"departure":{"time":1584062460,"delay":-60},"schedule_relationship":0},{"stop_id":"900000023204","arrival":{"time":1584062580,"delay":-60},"departure":{"time":1584062580,"delay":-60},"schedule_relationship":0},{"stop_id":"900000056101","arrival":{"time":1584062640,"delay":-60},"departure":{"time":null,"delay":null},"schedule_relationship":0}]}} -{"id":"4","vehicle":{"trip":{"trip_id":"1|64512|1|86|12032020","route_id":"n18"},"vehicle":{"id":null,"label":"U Mohrenstr."},"position":{"latitude":52.6,"longitude":13.4},"stop_id":"900000005205","current_status":2}} +{"header":{"gtfs_realtime_version":"2.0","incrementality":1,"timestamp":1},"entity":[{"id":"1","vehicle":{"trip":{"trip_id":"1|64512|1|86|12032020","route_id":"n18"},"vehicle":{"id":null,"label":"U Mohrenstr."},"position":{"latitude":52.5,"longitude":13.3},"stop_id":"900000005205","current_status":2}},{"id":"2","trip_update":{"trip":{"trip_id":"1|25445|2|86|12032020","route_id":"n3"},"vehicle":{"id":"40813","label":"U Wittenbergplatz"},"stop_time_update":[{"stop_id":"900000050301","arrival":{"time":null,"delay":null},"departure":{"time":1584060600,"delay":0},"schedule_relationship":0},{"stop_id":"900000023354","arrival":{"time":1584062400,"delay":-60},"departure":{"time":1584062400,"delay":-60},"schedule_relationship":0},{"stop_id":"900000023203","arrival":{"time":1584062460,"delay":-60},"departure":{"time":1584062460,"delay":-60},"schedule_relationship":0},{"stop_id":"900000023204","arrival":{"time":1584062580,"delay":-60},"departure":{"time":1584062580,"delay":-60},"schedule_relationship":0},{"stop_id":"900000056101","arrival":{"time":1584062640,"delay":-60},"departure":{"time":null,"delay":null},"schedule_relationship":0}]}}]} +{"header":{"gtfs_realtime_version":"2.0","incrementality":1,"timestamp":2},"entity":[{"id":"3","vehicle":{"trip":{"trip_id":"1|64466|1|86|12032020","route_id":"n6"},"vehicle":{"id":null,"label":"U Alt-Mariendorf"},"position":{"latitude":52.1,"longitude":13.1},"stop_id":"900000012106","current_status":1}}]} +{"header":{"gtfs_realtime_version":"2.0","incrementality":1,"timestamp":3},"entity":[{"id":"4","vehicle":{"trip":{"trip_id":"1|64512|1|86|12032020","route_id":"n18"},"vehicle":{"id":null,"label":"U Mohrenstr."},"position":{"latitude":52.6,"longitude":13.4},"stop_id":"900000005205","current_status":2}}]} diff --git a/test/index.js b/test/index.js index 19e0413..4e48957 100644 --- a/test/index.js +++ b/test/index.js @@ -95,17 +95,17 @@ pump( bufEqual(full.asFeedMessage(), Buffer.from( `\ -0a090a03322e301001180112520a0132224d0a1b0a15317c36343436367c317c38367\ -c31323033323032302a026e36421212105520416c742d4d617269656e646f7266120a\ -0d66665042159a9951413a0c39303030303030313231303620011289020a01331a830\ -20a1b0a15317c32353434357c327c38367c31323033323032302a026e331a1a0a0534\ -303831331211552057697474656e62657267706c61747a121c220c393030303030303\ -53033303112001a08080010b8b1abf3052800122c220c393030303030303233333534\ -120c08c4ffffff0f10c0bfabf3051a0c08c4ffffff0f10c0bfabf3052800122c220c3\ -93030303030303233323033120c08c4ffffff0f10fcbfabf3051a0c08c4ffffff0f10\ -fcbfabf3052800122c220c393030303030303233323034120c08c4ffffff0f10f4c0a\ -bf3051a0c08c4ffffff0f10f4c0abf30528001220220c393030303030303536313031\ -120c08c4ffffff0f10b0c1abf3051a002800124f0a0134224a0a1c0a15317c3634353\ +0a090a03322e30100118011289020a01321a83020a1b0a15317c32353434357c327c3\ +8367c31323033323032302a026e331a1a0a0534303831331211552057697474656e62\ +657267706c61747a121c220c39303030303030353033303112001a08080010b8b1abf\ +3052800122c220c393030303030303233333534120c08c4ffffff0f10c0bfabf3051a\ +0c08c4ffffff0f10c0bfabf3052800122c220c393030303030303233323033120c08c\ +4ffffff0f10fcbfabf3051a0c08c4ffffff0f10fcbfabf3052800122c220c39303030\ +3030303233323034120c08c4ffffff0f10f4c0abf3051a0c08c4ffffff0f10f4c0abf\ +30528001220220c393030303030303536313031120c08c4ffffff0f10b0c1abf3051a\ +00280012520a0133224d0a1b0a15317c36343436367c317c38367c313230333230323\ +02a026e36421212105520416c742d4d617269656e646f7266120a0d66665042159a99\ +51413a0c3930303030303031323130362001124f0a0134224a0a1c0a15317c3634353\ 1327c317c38367c31323033323032302a036e3138420e120c55204d6f6872656e7374\ 722e120a0d6666524215666656413a0c3930303030303030353230352002`, 'hex'