From 34de92da7ef2a076a305dea5bf1386392ca887bf Mon Sep 17 00:00:00 2001 From: Oliver Smith Date: Sat, 6 Sep 2014 14:37:45 -0700 Subject: [PATCH] v3.5 Converted EMDN Tap to use compressed JSON stream. --- emdn-tap.py | 16 ++++++------ emdn/firehose.py | 63 ++++++++++++++++++++++++++++++++++++++-------- emdn/itemrecord.py | 6 ++++- 3 files changed, 65 insertions(+), 20 deletions(-) diff --git a/emdn-tap.py b/emdn-tap.py index 043bab79..288e2160 100644 --- a/emdn-tap.py +++ b/emdn-tap.py @@ -70,9 +70,11 @@ def processCommandLine(): pargs.duration = pargs.minutes * 60 + pargs.seconds - print("* Fetching EMDN data from {} to {}. Automatic commits {}.".format( + print("* Fetching EMDN data from {} to {}.".format( pargs.firehoseURI or '['+Firehose.defaultURI+']', - pargs.db or '['+TradeDB.defaultDB+']', + pargs.db or '['+TradeDB.defaultDB+']' + )) + print("* Automatic commits {}.".format( 'every {} seconds'.format(pargs.commit) if pargs.commit else 'disabled' )) @@ -182,9 +184,6 @@ def main(): pargs = processCommandLine() - # For removing milliseconds from timestamps. - trimTimestampRe = re.compile(r'\.\d+$') - # Open the local TradeDangerous database dbFilename = pargs.db or TradeDB.defaultDB tdb = TradeDB(dbFilename=dbFilename, debug=1 if pargs.verbose else 0) @@ -193,9 +192,9 @@ def main(): loadUIOrders(db) # Open a connection to the firehose. - firehose = Firehose(pargs.firehoseURI) + firehose = Firehose(pargs.firehoseURI, debug=pargs.verbose) - if pargs.verbose: print("* Capture starting") + if pargs.verbose: print("* Capture starting.") lastCommit, duration = time.time(), pargs.duration recordsSinceLastCommit = [] @@ -213,8 +212,7 @@ def main(): if pargs.verbose and (records % 1000 == 0): print("# At {} captured {} records.".format(rec.timestamp, records)) if pargs.verbose > 1: - ts = trimTimestampRe.sub('', rec.timestamp) - print("[{}] {:.<60} {}cr/{}cr".format(ts, '{} @ {}/{}'.format(rec.item, rec.system, rec.station), rec.payingCr, rec.askingCr)) + print("[{}] {:.<60} {}cr/{}cr".format(rec.timestamp, '{} @ {}/{}'.format(rec.item, rec.system, rec.station), rec.payingCr, rec.askingCr)) # Find the item in the price database to get its data and make sure # it matches the category we expect to see it listed in. diff --git a/emdn/firehose.py b/emdn/firehose.py index 7bdcd983..fa369eea 100644 --- a/emdn/firehose.py +++ b/emdn/firehose.py @@ -36,13 +36,14 @@ try: import zmq except ImportError: - raise ImportError("This module requires the ZeroMQ library to be installed. The easiest way to obtain this is to type: pip install pyzmq") - -import time - + raise ImportError("This module requires the ZeroMQ library to be installed. The easiest way to obtain this is to type: pip install pyzmq") from None try: from itemrecord import ItemRecord except ImportError: from . itemrecord import ItemRecord +import zlib +import json +import time + class Firehose(object): """ Encapsulates a connection to the Elite Market Data Network (EMDN) @@ -52,10 +53,11 @@ class Firehose(object): reads from a local file called 'emdn.csv'. """ - defaultURI = 'tcp://firehose.elite-market-data.net:9050' + defaultURI = 'tcp://firehose.elite-market-data.net:9500' - def __init__(self, uri=None, ctx=None): + def __init__(self, uri=None, ctx=None, debug=0): self.__uri = uri or Firehose.defaultURI + self.debug = debug if self.__uri.find("file://") == 0: def _poll(timeout): @@ -74,7 +76,10 @@ def _poll(timeout): return self.__socket.poll(timeout) def _read(nonBlocking=False): flags = zmq.NOBLOCK if nonBlocking else 0 - result = self.__socket.recv_string(flags) + compressed = self.__socket.recv(flags) + uncompressed = zlib.decompress(compressed) + jsonString = uncompressed.decode('utf-8') + result = json.loads(jsonString) if not result: raise EOFError() return result @@ -91,6 +96,41 @@ def _read(nonBlocking=False): self.read = _read + def json_to_record(self, data): + record = None + try: + dataType = data['type'] + if dataType == 'marketquote': + try: + message = data['message'] + try: + return ItemRecord( + askingCr=message['buyPrice'], + payingCr=message['sellPrice'], + demand=message['demand'], + demandLevel=message['demandLevel'], + stock=message['stationStock'], + stockLevel=message['stationStockLevel'], + category=message['categoryName'], + item=message['itemName'], + location=message['stationName'], + timestamp=message['timestamp'] + ) + except KeyError as e: + print("jsonData['message']:", message) + raise ValueError("json marketquote data is missing a required field: {}".format(e)) + except KeyError: # missing 'message' element + if self.debug > 2: print("# json data didn't contain a 'message'") + if self.debug > 3: print(data) + else: # not a marketquote + if self.debug > 2: print("# ignoring '{}'".format(dataType)) + except KeyError: # missing 'type' field + if self.debug > 2: print("# invalid data, did not contain a 'type' key") + if self.debug > 3: print(data) + + return None + + def drink(self, records=None, timeout=None, burst=False): """ Drink from the firehose, yielding the data we retrieve as ItemRecords. @@ -125,12 +165,15 @@ def drink(self, records=None, timeout=None, burst=False): if pollResult: while recordsRemaining: try: - csv = self.read(nonBlocking=True) + jsData = self.read(nonBlocking=True) except EOFError: return except (zmq.error.Again, BlockingIOError, InterruptedError): break - yield ItemRecord(*(csv.split(','))) - recordsRemaining -= 1 + + record = self.json_to_record(jsData) + if record: + yield record + recordsRemaining -= 1 if burst: return diff --git a/emdn/itemrecord.py b/emdn/itemrecord.py index ac61e3b0..f777c9ce 100644 --- a/emdn/itemrecord.py +++ b/emdn/itemrecord.py @@ -24,9 +24,13 @@ class ItemRecord(object): stockLevel -- Enumeration of stock level. system -- Name of the star system this record is for. station -- Name of the station this record is for. + category -- Which category heading the item is under. + item -- Game name for the item. + location -- Where the item was seen in "System (Station)" format. timestamp -- Date/time of report (which is kind of crap, please don't use). """ systemStationRe = re.compile(r'^(.*?)\s*\((.*?)\)$') + timestampTrimRe = re.compile(r'^(\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2})') def __init__(self, askingCr, payingCr, demand, demandLevel, stock, stockLevel, category, item, location, timestamp): self.askingCr, self.payingCr = int(askingCr or 0), int(payingCr or 0) @@ -34,7 +38,7 @@ def __init__(self, askingCr, payingCr, demand, demandLevel, stock, stockLevel, c self.stock, self.stockLevel = int(stock or 0), int(stockLevel or 0) self.category, self.item = category, item self.system, self.station = ItemRecord.systemStationRe.match(location).group(1, 2) - self.timestamp = timestamp + self.timestamp = ItemRecord.timestampTrimRe.match(timestamp).group(1) def str(self): return "{},{},{},{},{},{},{},{},{} ({}),{}".format(