diff --git a/emdn/README.txt b/emdn/README.txt new file mode 100644 index 00000000..131261a6 --- /dev/null +++ b/emdn/README.txt @@ -0,0 +1,11 @@ +Simple Python module for accessing the Elite Market Data Network firehose. + +Copyright (C) Oliver 'kfsone' Smith 2014 : + You are free to use, redistribute, or even print and eat a copy of + this software so long as you include this copyright notice. + I guarantee there is at least one bug neither of us knew about. + +Initially written as part of the TradeDangerous trade calculator. + +For problems/issues/suggestions, see https://bitbucket.org/kfsone/tradedangerous/ + diff --git a/emdn/__init__.py b/emdn/__init__.py new file mode 100644 index 00000000..6850dc8e --- /dev/null +++ b/emdn/__init__.py @@ -0,0 +1,37 @@ +#! /usr/bin/env python +#--------------------------------------------------------------------- +# Copyright (C) Oliver 'kfsone' Smith 2014 : +# You are free to use, redistribute, or even print and eat a copy of +# this software so long as you include this copyright notice. +# I guarantee there is at least one bug neither of us knew about. +#--------------------------------------------------------------------- +# Elite Market Data Net :: Master Module + +""" + "Elite Market Data Net" (EMDN) is a ZeroMQ based service that + provides a near-realtime feed of market scrapes from the Elite + Dangerous universe. This feed is called the "firehose". + + emdn.ItemRecord class encapsulates a record as described by + the EMDN network. + + emdn.Firehose class encapsulates a connection to retrieve + ItemRecords in an iterative fashion. + + Example: + + import emdn + + firehose = emdn.Firehose() + # use firehose = emdn.Firehose(ctx=ctx) if you have your own zmq.Context + + # wait upto 10 seconds and retrieve upto 2 records: + for itemRec in firehose.drink(records=2, timeout=10.0) + pass + + # print everything else we receive + for itemRec in firehose.drink(): + print(itemRec) +""" + +__all__ = [ 'itemrecord', 'firehose' ] diff --git a/emdn/__main__.py b/emdn/__main__.py new file mode 100644 index 00000000..20c5da8c --- /dev/null +++ b/emdn/__main__.py @@ -0,0 +1,63 @@ +#! /usr/bin/env python +#--------------------------------------------------------------------- +# Copyright (C) Oliver 'kfsone' Smith 2014 : +# You are free to use, redistribute, or even print and eat a copy of +# this software so long as you include this copyright notice. +# I guarantee there is at least one bug neither of us knew about. +#--------------------------------------------------------------------- +# Elite Market Data Net :: Command line entry point +# Demonstrates how to use the firehose. Not intended for end-user use. + +print("Running Elite-Market-Data.net (EMDN) module tests.") + +try: from firehose import Firehose +except ImportError: from . firehose import Firehose + +firehose = Firehose() +print("Firehose ready.") +print() + +import time + + +def test(description, caveat, it): + test.testNo += 1 + + print('8x -->', 'BEGIN Test #{}:'.format(test.testNo), description, '<-- x8') + print(caveat) + print() + + totalRecords, lastSecond, recordsThisSecond = 0, 0, 0 + maxRecordsPerSecond = 3 + for rec in it: + totalRecords += 1 + # throttle how many lines we show + second = int(time.clock()) + if second != lastSecond: + if recordsThisSecond > maxRecordsPerSecond: + print("... and {} more".format(recordsThisSecond - maxRecordsPerSecond)) + lastSecond, recordsThisSecond = second, 0 + recordsThisSecond += 1 + if recordsThisSecond > maxRecordsPerSecond: continue + print(rec) + + if totalRecords == 0: + print("NOTE: No data received.") + elif recordsThisSecond > maxRecordsPerSecond: + print("... and {} more".format(recordsThisSecond - maxRecordsPerSecond)) + + print('8x <--', 'END Test #{}'.format(test.testNo), description, '--> x8') + print() + print() + +test.testNo = 0 + +test("Drink one record only", "This will hang until there is some data.", firehose.drink(records=1)) + +test("Drink for 30 seconds", "If there's no data, this will do nothing for 30 seconds.", firehose.drink(timeout=30.0)) + +test("Drink for 60 seconds or 10 records", "If there's no data, this will do nothing for 60 seconds.", firehose.drink(timeout=60.0, records=10)) + +test("Drink for 90 seconds, 30,000 records, or until the first 'burst' has ended.", "Guess what happens if there's no data?", firehose.drink(timeout=90, records=30000, burst=True)) + +print("- Done") diff --git a/emdn/firehose.py b/emdn/firehose.py new file mode 100644 index 00000000..d773028b --- /dev/null +++ b/emdn/firehose.py @@ -0,0 +1,114 @@ +#! /usr/bin/env python +#--------------------------------------------------------------------- +# Copyright (C) Oliver 'kfsone' Smith 2014 : +# You are free to use, redistribute, or even print and eat a copy of +# this software so long as you include this copyright notice. +# I guarantee there is at least one bug neither of us knew about. +#--------------------------------------------------------------------- +# Elite Market Data Net :: Modules :: Main Module + +""" + "Elite Market Data Net" (EMDN) is a ZeroMQ based service that + provides a near-realtime feed of market scrapes from the Elite + Dangerous universe. This feed is called the "firehose". + + emdn.ItemRecord class encapsulates a record as described by + the EMDN network. + + emdn.Firehose class encapsulates a connection to retrieve + ItemRecords in an iterative fashion. + + Example: + + from emdn.firehose import Firehose + firehose = Firehose() + # use firehose = Firehose(ctx=ctx) if you have your own zmq.Context + + # wait upto 10 seconds and retrieve upto 2 records: + for itemRec in firehose.drink(records=2, timeout=10.0) + pass + + # print everything else we receive + for itemRec in firehose.drink(): + print(itemRec) +""" + +try: + import zmq +except ImportError: + raise ImportError("This module requires the ZeroMQ library to be installed. The easiest way to obtain this is to type: pip install pyzmq") + +import time + +try: from itemrecord import ItemRecord +except ImportError: from . itemrecord import ItemRecord + +class Firehose(object): + """ + Encapsulates a connection to the Elite Market Data Network (EMDN) + live feed of price updates. + """ + + defaultURI = 'tcp://firehose.elite-market-data.net:9050' + + def __init__(self, uri=None, ctx=None): + self.__uri = uri or Firehose.defaultURI + + # All ZMQ operations are done through a Context, + # so use one we're given or create one for ourselves. + self.__ctx = ctx or zmq.Context() + + # EMDN is using the pub/sub model, a bit overzealously, + # so we need a subscriber socket subscribed to nothing. + self.__socket = self.__ctx.socket(zmq.SUB) + self.__socket.setsockopt(zmq.SUBSCRIBE, ''.encode()) + self.__socket.connect(self.__uri) + + + def drink(self, records=None, timeout=None, burst=False): + """ + Drink from the firehose, yielding the data we retrieve as ItemRecords. + + Keyword arguments: + records -- maximum number of records to yield before exiting. Default, 0, is unlimited. + timeout -- maximum total time to wait for data. + burst -- set True to stop after the first set of records are retrieved. + + e.g. + drink(records=50, timeout=300) + Reads until we have received 50 seconds or 300 seconds have passed. + + drink(records=50, timeout=10.5, burst=True) + Reads until the first of: + 50 records have been received, + 10.5 seconds has elapsed, + the first burst of data has been drained. + """ + + if self.__socket.closed: + raise BrokenPipeError("Firehose socket is closed") + + socket = self.__socket + maxPollDuration = timeout + recordsRemaining = records or 1 + recordCost = 1 if records else 0 + + if timeout: + cutoffTime = time.clock() + timeout + + while recordsRemaining: + if timeout: + maxPollDuration = (cutoffTime - time.clock()) * 1000 + if maxPollDuration <= 0: + return + if socket.poll(timeout=maxPollDuration): + while recordsRemaining: + try: + csv = socket.recv_string(zmq.NOBLOCK) + except zmq.error.Again: + break + yield ItemRecord(*(csv.split(','))) + recordsRemaining -= recordCost + if burst: + return + diff --git a/emdn/itemrecord.py b/emdn/itemrecord.py new file mode 100644 index 00000000..d4c22303 --- /dev/null +++ b/emdn/itemrecord.py @@ -0,0 +1,35 @@ +#! /usr/bin/env python +#--------------------------------------------------------------------- +# Copyright (C) Oliver 'kfsone' Smith 2014 : +# You are free to use, redistribute, or even print and eat a copy of +# this software so long as you include this copyright notice. +# I guarantee there is at least one bug neither of us knew about. +#--------------------------------------------------------------------- +# Elite Market Data Net :: Modules :: ItemRecord +# EMDN record description. + +import re + +class ItemRecord(object): + """ + Describes a record pulled from the Firehose. + """ + systemStationRe = re.compile(r'^(.*?)\s*\((.*?)\)$') + + def __init__(self, askingCr, payingCr, demand, demandLevel, stock, stockLevel, category, item, location, timestamp): + self.askingCr, self.payingCr = int(askingCr or 0), int(payingCr or 0) + self.demand, self.demandLevel = int(demand or 0), int(demandLevel or 0) + self.stock, self.stockLevel = int(stock or 0), int(stockLevel or 0) + self.category, self.item = category, item + self.system, self.station = ItemRecord.systemStationRe.match(location).group(1, 2) + self.timestamp = timestamp + + def str(self): + return "{},{},{},{},{},{},{},{},{} ({}),{}".format( + self.askingCr, self.payingCr, self.demand, self.demandLevel, self.stock, self.stockLevel, self.category, self.item, self.system, self.station, self.timestamp + ) + + def __repr__(self): + return "ItemRecord(askingCr={}, payingCr={}, demand={}, demandLevel={}, stock={}, stockLevel={}, category=\"{}\", item=\"{}\", location=\"{} ({})\", timestamp='{}')".format( + self.askingCr, self.payingCr, self.demand, self.demandLevel, self.stock, self.stockLevel, re.escape(self.category), re.escape(self.item), re.escape(self.system), re.escape(self.station), self.timestamp + )