Skip to content

Commit

Permalink
v3.5 Converted EMDN Tap to use compressed JSON stream.
Browse files Browse the repository at this point in the history
  • Loading branch information
kfsone committed Sep 6, 2014
1 parent 7ebea7f commit 34de92d
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 20 deletions.
16 changes: 7 additions & 9 deletions emdn-tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,11 @@ def processCommandLine():

pargs.duration = pargs.minutes * 60 + pargs.seconds

print("* Fetching EMDN data from {} to {}. Automatic commits {}.".format(
print("* Fetching EMDN data from {} to {}.".format(
pargs.firehoseURI or '['+Firehose.defaultURI+']',
pargs.db or '['+TradeDB.defaultDB+']',
pargs.db or '['+TradeDB.defaultDB+']'
))
print("* Automatic commits {}.".format(
'every {} seconds'.format(pargs.commit) if pargs.commit else 'disabled'
))

Expand Down Expand Up @@ -182,9 +184,6 @@ def main():

pargs = processCommandLine()

# For removing milliseconds from timestamps.
trimTimestampRe = re.compile(r'\.\d+$')

# Open the local TradeDangerous database
dbFilename = pargs.db or TradeDB.defaultDB
tdb = TradeDB(dbFilename=dbFilename, debug=1 if pargs.verbose else 0)
Expand All @@ -193,9 +192,9 @@ def main():
loadUIOrders(db)

# Open a connection to the firehose.
firehose = Firehose(pargs.firehoseURI)
firehose = Firehose(pargs.firehoseURI, debug=pargs.verbose)

if pargs.verbose: print("* Capture starting")
if pargs.verbose: print("* Capture starting.")

lastCommit, duration = time.time(), pargs.duration
recordsSinceLastCommit = []
Expand All @@ -213,8 +212,7 @@ def main():
if pargs.verbose and (records % 1000 == 0):
print("# At {} captured {} records.".format(rec.timestamp, records))
if pargs.verbose > 1:
ts = trimTimestampRe.sub('', rec.timestamp)
print("[{}] {:.<60} {}cr/{}cr".format(ts, '{} @ {}/{}'.format(rec.item, rec.system, rec.station), rec.payingCr, rec.askingCr))
print("[{}] {:.<60} {}cr/{}cr".format(rec.timestamp, '{} @ {}/{}'.format(rec.item, rec.system, rec.station), rec.payingCr, rec.askingCr))

# Find the item in the price database to get its data and make sure
# it matches the category we expect to see it listed in.
Expand Down
63 changes: 53 additions & 10 deletions emdn/firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@
try:
import zmq
except ImportError:
raise ImportError("This module requires the ZeroMQ library to be installed. The easiest way to obtain this is to type: pip install pyzmq")

import time

raise ImportError("This module requires the ZeroMQ library to be installed. The easiest way to obtain this is to type: pip install pyzmq") from None
try: from itemrecord import ItemRecord
except ImportError: from . itemrecord import ItemRecord

import zlib
import json
import time

class Firehose(object):
"""
Encapsulates a connection to the Elite Market Data Network (EMDN)
Expand All @@ -52,10 +53,11 @@ class Firehose(object):
reads from a local file called 'emdn.csv'.
"""

defaultURI = 'tcp://firehose.elite-market-data.net:9050'
defaultURI = 'tcp://firehose.elite-market-data.net:9500'

def __init__(self, uri=None, ctx=None):
def __init__(self, uri=None, ctx=None, debug=0):
self.__uri = uri or Firehose.defaultURI
self.debug = debug

if self.__uri.find("file://") == 0:
def _poll(timeout):
Expand All @@ -74,7 +76,10 @@ def _poll(timeout):
return self.__socket.poll(timeout)
def _read(nonBlocking=False):
flags = zmq.NOBLOCK if nonBlocking else 0
result = self.__socket.recv_string(flags)
compressed = self.__socket.recv(flags)
uncompressed = zlib.decompress(compressed)
jsonString = uncompressed.decode('utf-8')
result = json.loads(jsonString)
if not result: raise EOFError()
return result

Expand All @@ -91,6 +96,41 @@ def _read(nonBlocking=False):
self.read = _read


def json_to_record(self, data):
record = None
try:
dataType = data['type']
if dataType == 'marketquote':
try:
message = data['message']
try:
return ItemRecord(
askingCr=message['buyPrice'],
payingCr=message['sellPrice'],
demand=message['demand'],
demandLevel=message['demandLevel'],
stock=message['stationStock'],
stockLevel=message['stationStockLevel'],
category=message['categoryName'],
item=message['itemName'],
location=message['stationName'],
timestamp=message['timestamp']
)
except KeyError as e:
print("jsonData['message']:", message)
raise ValueError("json marketquote data is missing a required field: {}".format(e))
except KeyError: # missing 'message' element
if self.debug > 2: print("# json data didn't contain a 'message'")
if self.debug > 3: print(data)
else: # not a marketquote
if self.debug > 2: print("# ignoring '{}'".format(dataType))
except KeyError: # missing 'type' field
if self.debug > 2: print("# invalid data, did not contain a 'type' key")
if self.debug > 3: print(data)

return None


def drink(self, records=None, timeout=None, burst=False):
"""
Drink from the firehose, yielding the data we retrieve as ItemRecords.
Expand Down Expand Up @@ -125,12 +165,15 @@ def drink(self, records=None, timeout=None, burst=False):
if pollResult:
while recordsRemaining:
try:
csv = self.read(nonBlocking=True)
jsData = self.read(nonBlocking=True)
except EOFError:
return
except (zmq.error.Again, BlockingIOError, InterruptedError):
break
yield ItemRecord(*(csv.split(',')))
recordsRemaining -= 1

record = self.json_to_record(jsData)
if record:
yield record
recordsRemaining -= 1
if burst:
return
6 changes: 5 additions & 1 deletion emdn/itemrecord.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,21 @@ class ItemRecord(object):
stockLevel -- Enumeration of stock level.
system -- Name of the star system this record is for.
station -- Name of the station this record is for.
category -- Which category heading the item is under.
item -- Game name for the item.
location -- Where the item was seen in "System (Station)" format.
timestamp -- Date/time of report (which is kind of crap, please don't use).
"""
systemStationRe = re.compile(r'^(.*?)\s*\((.*?)\)$')
timestampTrimRe = re.compile(r'^(\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2})')

def __init__(self, askingCr, payingCr, demand, demandLevel, stock, stockLevel, category, item, location, timestamp):
self.askingCr, self.payingCr = int(askingCr or 0), int(payingCr or 0)
self.demand, self.demandLevel = int(demand or 0), int(demandLevel or 0)
self.stock, self.stockLevel = int(stock or 0), int(stockLevel or 0)
self.category, self.item = category, item
self.system, self.station = ItemRecord.systemStationRe.match(location).group(1, 2)
self.timestamp = timestamp
self.timestamp = ItemRecord.timestampTrimRe.match(timestamp).group(1)

def str(self):
return "{},{},{},{},{},{},{},{},{} ({}),{}".format(
Expand Down

0 comments on commit 34de92d

Please sign in to comment.