Skip to content

Commit

Permalink
emdn-tap now tries harder to honor --commit intervals
Browse files Browse the repository at this point in the history
  • Loading branch information
kfsone committed Sep 15, 2014
1 parent ca763a8 commit 4ccd5db
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 66 deletions.
151 changes: 85 additions & 66 deletions emdn-tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ def commit(tdb, db, recordsSinceLastCommit, pargs):
If --no-writes was specified, does everything except the actual commit.
"""
if not recordsSinceLastCommit:
if pargs.verbose > 2:
print("-> no records to commit.")
return

if pargs.verbose:
Expand All @@ -197,8 +199,8 @@ def commit(tdb, db, recordsSinceLastCommit, pargs):

# Rebuild prices
dbFilename = tdb.dbURI
if pargs.verbose:
print("- Rebuild prices file" + (" [disabled]" if pargs.noWrites else ""))
if pargs.verbose > 1:
print("-> Rebuild prices file" + (" [disabled]" if pargs.noWrites else ""))

if not pargs.noWrites:
with tdb.pricesPath.open("w") as pricesFile:
Expand Down Expand Up @@ -227,72 +229,89 @@ def main():

if pargs.verbose: print("* Capture starting.")

lastCommit, duration = time.time(), pargs.duration
now = time.time()
lastCommit, duration = now, pargs.duration
recordsSinceLastCommit = []
if duration:
endOfRun = now + duration
else:
endOfRun = None

def consumeRecord(rec):
nonlocal pargs, records, recordsSinceLastCommit, tdb

if rec.payingCr == 0 and rec.askingCr == 0:
if pargs.verbose > 2:
print("# Ignoring 0/0 entry for {} @ {}/{}".format(rec.item, rec.system, rec.station))
if rec.payingCr < 0 or rec.askingCr < 0 \
or rec.stock < 0 or rec.stockLevel < 0 \
or rec.demand < 0 or rec.demandLevel < 0:
bleat("item", '{}@{}/{}'.format(rec.item, rec.system, rec.station), "Invalid (negative) value in price/stock fields")
return

records += 1

if pargs.verbose and (records % 1000 == 0):
print("# At {} captured {} records.".format(rec.timestamp, records))

if pargs.verbose > 1:
paying = localedNo(rec.payingCr)+'cr' if rec.payingCr else ' - '
asking = localedNo(rec.askingCr)+'cr' if rec.askingCr else ' - '
desc = '{} @ {}/{}'.format(rec.item, rec.system, rec.station)
extra = " | {:>6}L{} {:>6}L{}".format(rec.demand, rec.demandLevel, rec.stock, rec.stockLevel) if pargs.verbose > 2 else ""
print("{} {:.<65} {:>9} {:>9}{}".format(rec.timestamp, desc, paying, asking, extra))

# As of Beta 1.04, if you are carrying an item that the station doesn't handle
# the UI shows a fake entry with the prices from the station you bought the
# item from.

if rec.demandLevel == 0 and rec.stockLevel == 0:
if pargs.verbose > 2:
warning("Ignoring no-demand entry for {} @ {}/{}".format(rec.item, rec.system, rec.station))
return

# Find the item in the price database to get its data and make sure
# it matches the category we expect to see it listed in.
try:
item = tdb.lookupItem(rec.item)
if TradeDB.normalizedStr(item.category.dbname) != TradeDB.normalizedStr(rec.category):
bleat("item", rec.item, "\aCATEGORY MISMATCH: {}/{} => item: {}/{} aka {}".format(rec.category, rec.item, item.category.dbname, item.dbname, item.altname or 'None'))
return
except LookupError:
if not rec.item in blackMarketItems:
bleat("item", rec.item, "UNRECOGNIZED ITEM:", rec.item)
return

# Lookup the station.
try: system = tdb.lookupSystem(rec.system)
except LookupError:
bleat("system", rec.system, "UNRECOGNIZED SYSTEM:", rec.system)
return

try: station = tdb.lookupStation(rec.station, system=system)
except LookupError:
bleat("station", rec.station, "UNRECOGNIZED STATION:", rec.system, rec.station)
return

uiOrder = getItemUIOrder(station.ID, item.category.ID, item.ID)
recordsSinceLastCommit.append([ item.ID, station.ID, uiOrder, rec.payingCr, rec.askingCr, rec.demand, rec.demandLevel, rec.stock, rec.stockLevel ])


try:
for rec in firehose.drink(records=pargs.records, timeout=duration or None):
if pargs.commit:
now = time.time()
if now >= lastCommit + pargs.commit:
commit(tdb, db, recordsSinceLastCommit, pargs)
recordsSinceLastCommit = []
lastCommit = now

if rec.payingCr == 0 and rec.askingCr == 0:
if pargs.verbose > 2:
print("# Ignoring 0/0 entry for {} @ {}/{}".format(rec.item, rec.system, rec.station))
if rec.payingCr < 0 or rec.askingCr < 0 \
or rec.stock < 0 or rec.stockLevel < 0 \
or rec.demand < 0 or rec.demandLevel < 0:
bleat("item", '{}@{}/{}'.format(rec.item, rec.system, rec.station), "Invalid (negative) value in price/stock fields")
continue

records += 1

if pargs.verbose and (records % 1000 == 0):
print("# At {} captured {} records.".format(rec.timestamp, records))

if pargs.verbose > 1:
paying = localedNo(rec.payingCr)+'cr' if rec.payingCr else ' - '
asking = localedNo(rec.askingCr)+'cr' if rec.askingCr else ' - '
desc = '{} @ {}/{}'.format(rec.item, rec.system, rec.station)
extra = " | {:>6}L{} {:>6}L{}".format(rec.demand, rec.demandLevel, rec.stock, rec.stockLevel) if pargs.verbose > 2 else ""
print("{} {:.<65} {:>9} {:>9}{}".format(rec.timestamp, desc, paying, asking, extra))

# As of Beta 1.04, if you are carrying an item that the station doesn't handle
# the UI shows a fake entry with the prices from the station you bought the
# item from.

if rec.demandLevel == 0 and rec.stockLevel == 0:
if pargs.verbose > 2:
warning("Ignoring no-demand entry for {} @ {}/{}".format(rec.item, rec.system, rec.station))
continue

# Find the item in the price database to get its data and make sure
# it matches the category we expect to see it listed in.
try:
item = tdb.lookupItem(rec.item)
if TradeDB.normalizedStr(item.category.dbname) != TradeDB.normalizedStr(rec.category):
bleat("item", rec.item, "\aCATEGORY MISMATCH: {}/{} => item: {}/{} aka {}".format(rec.category, rec.item, item.category.dbname, item.dbname, item.altname or 'None'))
continue
except LookupError:
if not rec.item in blackMarketItems:
bleat("item", rec.item, "UNRECOGNIZED ITEM:", rec.item)
continue

# Lookup the station.
try: system = tdb.lookupSystem(rec.system)
except LookupError:
bleat("system", rec.system, "UNRECOGNIZED SYSTEM:", rec.system)
continue

try: station = tdb.lookupStation(rec.station, system=system)
except LookupError:
bleat("station", rec.station, "UNRECOGNIZED STATION:", rec.system, rec.station)
continue

uiOrder = getItemUIOrder(station.ID, item.category.ID, item.ID)
recordsSinceLastCommit.append([ item.ID, station.ID, uiOrder, rec.payingCr, rec.askingCr, rec.demand, rec.demandLevel, rec.stock, rec.stockLevel ])
now = time.time()
while True:
nextCommit = lastCommit + pargs.commit
timeLeft = cutoffTime - now if duration else nextCommit - now
timeout = min(timeLeft, nextCommit - now) or None
for rec in firehose.drink(records=pargs.records, timeout=timeout):
consumeRecord(rec)
if pargs.verbose > 2:
print("- tick")
lastCommit = now = time.time()
commit(tdb, db, recordsSinceLastCommit, pargs)
recordsSinceLastCommit = []
if endOfRun and now >= endOfRun:
break
except KeyboardInterrupt:
print("Ctrl-C pressed, stopping.")

Expand Down
1 change: 1 addition & 0 deletions emdn/firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def _read(nonBlocking=False):
else:
def _poll(timeout):
#### TODO: Use a ZMQ Poller so we can catch ctrl-c
# Bleah, current version of pyzmq neither works.
return self.__socket.poll(timeout)
def _read(nonBlocking=False):
flags = zmq.NOBLOCK if nonBlocking else 0
Expand Down

0 comments on commit 4ccd5db

Please sign in to comment.