diff --git a/emdn-tap.py b/emdn-tap.py index f5221db9..9226843c 100644 --- a/emdn-tap.py +++ b/emdn-tap.py @@ -184,6 +184,8 @@ def commit(tdb, db, recordsSinceLastCommit, pargs): If --no-writes was specified, does everything except the actual commit. """ if not recordsSinceLastCommit: + if pargs.verbose > 2: + print("-> no records to commit.") return if pargs.verbose: @@ -197,8 +199,8 @@ def commit(tdb, db, recordsSinceLastCommit, pargs): # Rebuild prices dbFilename = tdb.dbURI - if pargs.verbose: - print("- Rebuild prices file" + (" [disabled]" if pargs.noWrites else "")) + if pargs.verbose > 1: + print("-> Rebuild prices file" + (" [disabled]" if pargs.noWrites else "")) if not pargs.noWrites: with tdb.pricesPath.open("w") as pricesFile: @@ -227,72 +229,89 @@ def main(): if pargs.verbose: print("* Capture starting.") - lastCommit, duration = time.time(), pargs.duration + now = time.time() + lastCommit, duration = now, pargs.duration recordsSinceLastCommit = [] + if duration: + endOfRun = now + duration + else: + endOfRun = None + + def consumeRecord(rec): + nonlocal pargs, records, recordsSinceLastCommit, tdb + + if rec.payingCr == 0 and rec.askingCr == 0: + if pargs.verbose > 2: + print("# Ignoring 0/0 entry for {} @ {}/{}".format(rec.item, rec.system, rec.station)) + if rec.payingCr < 0 or rec.askingCr < 0 \ + or rec.stock < 0 or rec.stockLevel < 0 \ + or rec.demand < 0 or rec.demandLevel < 0: + bleat("item", '{}@{}/{}'.format(rec.item, rec.system, rec.station), "Invalid (negative) value in price/stock fields") + return + + records += 1 + + if pargs.verbose and (records % 1000 == 0): + print("# At {} captured {} records.".format(rec.timestamp, records)) + + if pargs.verbose > 1: + paying = localedNo(rec.payingCr)+'cr' if rec.payingCr else ' - ' + asking = localedNo(rec.askingCr)+'cr' if rec.askingCr else ' - ' + desc = '{} @ {}/{}'.format(rec.item, rec.system, rec.station) + extra = " | {:>6}L{} {:>6}L{}".format(rec.demand, rec.demandLevel, rec.stock, rec.stockLevel) if pargs.verbose > 2 else "" + print("{} {:.<65} {:>9} {:>9}{}".format(rec.timestamp, desc, paying, asking, extra)) + + # As of Beta 1.04, if you are carrying an item that the station doesn't handle + # the UI shows a fake entry with the prices from the station you bought the + # item from. + + if rec.demandLevel == 0 and rec.stockLevel == 0: + if pargs.verbose > 2: + warning("Ignoring no-demand entry for {} @ {}/{}".format(rec.item, rec.system, rec.station)) + return + + # Find the item in the price database to get its data and make sure + # it matches the category we expect to see it listed in. + try: + item = tdb.lookupItem(rec.item) + if TradeDB.normalizedStr(item.category.dbname) != TradeDB.normalizedStr(rec.category): + bleat("item", rec.item, "\aCATEGORY MISMATCH: {}/{} => item: {}/{} aka {}".format(rec.category, rec.item, item.category.dbname, item.dbname, item.altname or 'None')) + return + except LookupError: + if not rec.item in blackMarketItems: + bleat("item", rec.item, "UNRECOGNIZED ITEM:", rec.item) + return + + # Lookup the station. + try: system = tdb.lookupSystem(rec.system) + except LookupError: + bleat("system", rec.system, "UNRECOGNIZED SYSTEM:", rec.system) + return + + try: station = tdb.lookupStation(rec.station, system=system) + except LookupError: + bleat("station", rec.station, "UNRECOGNIZED STATION:", rec.system, rec.station) + return + + uiOrder = getItemUIOrder(station.ID, item.category.ID, item.ID) + recordsSinceLastCommit.append([ item.ID, station.ID, uiOrder, rec.payingCr, rec.askingCr, rec.demand, rec.demandLevel, rec.stock, rec.stockLevel ]) + + try: - for rec in firehose.drink(records=pargs.records, timeout=duration or None): - if pargs.commit: - now = time.time() - if now >= lastCommit + pargs.commit: - commit(tdb, db, recordsSinceLastCommit, pargs) - recordsSinceLastCommit = [] - lastCommit = now - - if rec.payingCr == 0 and rec.askingCr == 0: - if pargs.verbose > 2: - print("# Ignoring 0/0 entry for {} @ {}/{}".format(rec.item, rec.system, rec.station)) - if rec.payingCr < 0 or rec.askingCr < 0 \ - or rec.stock < 0 or rec.stockLevel < 0 \ - or rec.demand < 0 or rec.demandLevel < 0: - bleat("item", '{}@{}/{}'.format(rec.item, rec.system, rec.station), "Invalid (negative) value in price/stock fields") - continue - - records += 1 - - if pargs.verbose and (records % 1000 == 0): - print("# At {} captured {} records.".format(rec.timestamp, records)) - - if pargs.verbose > 1: - paying = localedNo(rec.payingCr)+'cr' if rec.payingCr else ' - ' - asking = localedNo(rec.askingCr)+'cr' if rec.askingCr else ' - ' - desc = '{} @ {}/{}'.format(rec.item, rec.system, rec.station) - extra = " | {:>6}L{} {:>6}L{}".format(rec.demand, rec.demandLevel, rec.stock, rec.stockLevel) if pargs.verbose > 2 else "" - print("{} {:.<65} {:>9} {:>9}{}".format(rec.timestamp, desc, paying, asking, extra)) - - # As of Beta 1.04, if you are carrying an item that the station doesn't handle - # the UI shows a fake entry with the prices from the station you bought the - # item from. - - if rec.demandLevel == 0 and rec.stockLevel == 0: - if pargs.verbose > 2: - warning("Ignoring no-demand entry for {} @ {}/{}".format(rec.item, rec.system, rec.station)) - continue - - # Find the item in the price database to get its data and make sure - # it matches the category we expect to see it listed in. - try: - item = tdb.lookupItem(rec.item) - if TradeDB.normalizedStr(item.category.dbname) != TradeDB.normalizedStr(rec.category): - bleat("item", rec.item, "\aCATEGORY MISMATCH: {}/{} => item: {}/{} aka {}".format(rec.category, rec.item, item.category.dbname, item.dbname, item.altname or 'None')) - continue - except LookupError: - if not rec.item in blackMarketItems: - bleat("item", rec.item, "UNRECOGNIZED ITEM:", rec.item) - continue - - # Lookup the station. - try: system = tdb.lookupSystem(rec.system) - except LookupError: - bleat("system", rec.system, "UNRECOGNIZED SYSTEM:", rec.system) - continue - - try: station = tdb.lookupStation(rec.station, system=system) - except LookupError: - bleat("station", rec.station, "UNRECOGNIZED STATION:", rec.system, rec.station) - continue - - uiOrder = getItemUIOrder(station.ID, item.category.ID, item.ID) - recordsSinceLastCommit.append([ item.ID, station.ID, uiOrder, rec.payingCr, rec.askingCr, rec.demand, rec.demandLevel, rec.stock, rec.stockLevel ]) + now = time.time() + while True: + nextCommit = lastCommit + pargs.commit + timeLeft = cutoffTime - now if duration else nextCommit - now + timeout = min(timeLeft, nextCommit - now) or None + for rec in firehose.drink(records=pargs.records, timeout=timeout): + consumeRecord(rec) + if pargs.verbose > 2: + print("- tick") + lastCommit = now = time.time() + commit(tdb, db, recordsSinceLastCommit, pargs) + recordsSinceLastCommit = [] + if endOfRun and now >= endOfRun: + break except KeyboardInterrupt: print("Ctrl-C pressed, stopping.") diff --git a/emdn/firehose.py b/emdn/firehose.py index b381f0be..d52bab58 100644 --- a/emdn/firehose.py +++ b/emdn/firehose.py @@ -73,6 +73,7 @@ def _read(nonBlocking=False): else: def _poll(timeout): #### TODO: Use a ZMQ Poller so we can catch ctrl-c + # Bleah, current version of pyzmq neither works. return self.__socket.poll(timeout) def _read(nonBlocking=False): flags = zmq.NOBLOCK if nonBlocking else 0