Skip to content

Commit

Permalink
Added file:/// uris for EMDN module
Browse files Browse the repository at this point in the history
  • Loading branch information
kfsone committed Sep 4, 2014
1 parent a4401ce commit 9627651
Showing 1 changed file with 35 additions and 18 deletions.
53 changes: 35 additions & 18 deletions emdn/firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,44 @@ class Firehose(object):
"""
Encapsulates a connection to the Elite Market Data Network (EMDN)
live feed of price updates.
Also handles "file:///" URIs for local testing, e.g.
hose = Firehose("file://./emdn.csv")
reads from a local file called 'emdn.csv'.
"""

defaultURI = 'tcp://firehose.elite-market-data.net:9050'

def __init__(self, uri=None, ctx=None):
self.__uri = uri or Firehose.defaultURI

# All ZMQ operations are done through a Context,
# so use one we're given or create one for ourselves.
self.__ctx = ctx or zmq.Context()

# EMDN is using the pub/sub model, a bit overzealously,
# so we need a subscriber socket subscribed to nothing.
self.__socket = self.__ctx.socket(zmq.SUB)
self.__socket.setsockopt(zmq.SUBSCRIBE, ''.encode())
self.__socket.connect(self.__uri)
if self.__uri.find("file://") == 0:
def _poll(timeout):
return 1
def _read():
result = self.__filehandle.readline().strip()
if not result: raise EOFError()
return result

self.poll = _poll
self.read = _read
self.__filehandle = open(self.__uri[7:], 'r')
else:
def _poll(timeout):
return self.__socket.poll(timeout)
def _read():
return self.__socket.recv_string()

# All ZMQ operations are done through a Context,
# so use one we're given or create one for ourselves.
self.__ctx = ctx or zmq.Context()

# EMDN is using the pub/sub model, a bit overzealously,
# so we need a subscriber socket subscribed to nothing.
self.__socket = self.__ctx.socket(zmq.SUB)
self.__socket.setsockopt(zmq.SUBSCRIBE, ''.encode())
self.__socket.connect(self.__uri)
self.poll = _poll
self.read = _read


def drink(self, records=None, timeout=None, burst=False):
Expand All @@ -85,10 +107,6 @@ def drink(self, records=None, timeout=None, burst=False):
the first burst of data has been drained.
"""

if self.__socket.closed:
raise BrokenPipeError("Firehose socket is closed")

socket = self.__socket
maxPollDuration = timeout
recordsRemaining = records or 1
recordCost = 1 if records else 0
Expand All @@ -101,14 +119,13 @@ def drink(self, records=None, timeout=None, burst=False):
maxPollDuration = (cutoffTime - time.clock()) * 1000
if maxPollDuration <= 0:
return
if socket.poll(timeout=maxPollDuration):
if self.poll(timeout=maxPollDuration):
while recordsRemaining:
try:
csv = socket.recv_string(zmq.NOBLOCK)
except zmq.error.Again:
break
csv = self.read()
except (zmq.error.Again, EOFError):
return
yield ItemRecord(*(csv.split(',')))
recordsRemaining -= recordCost
if burst:
return

0 comments on commit 9627651

Please sign in to comment.