Skip to content

Commit

Permalink
feat: better eddblink progress reporting
Browse files Browse the repository at this point in the history
- small tweaks to improve performance of eddblink import,
- commit batching to try and improve eddblink import speed,
- use rich progress bars to give better insight into commit rate

one interesting thing to try is to change the progress bar description around the COMMIT while importing prices:

```
prog.increment(description="COMMIT")
cursor.execute("COMMIT")
transaction_items = 0
cursor.execute("BEGIN TRANSACTION")
prog.increment(description="Processing")
```

and then adjust the batch size. when batchsize is too low, this makes the description flicker, but you can get a sense for how increasing the batch size reduces the total import time until the amount of memory/wal etc starts to make the commit time excessively long
  • Loading branch information
kfsone authored and eyeonus committed May 6, 2024
1 parent 6f188a4 commit f3af244
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 28 deletions.
25 changes: 18 additions & 7 deletions tradedangerous/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import sys
import typing

from functools import partial as partial_fn
from .fs import file_line_count
from .tradeexcept import TradeException
from tradedangerous.misc.progress import Progress, CountingBar
from . import corrections, utils
Expand All @@ -38,6 +40,7 @@

# For mypy/pylint type checking
if typing.TYPE_CHECKING:
from collections.abc import Callable
from typing import Any, Optional, TextIO

from .tradeenv import TradeEnv
Expand Down Expand Up @@ -778,11 +781,14 @@ def deprecationCheckItem(importPath, lineNo, line):
)


def processImportFile(tdenv, db, importPath, tableName):
def processImportFile(tdenv, db, importPath, tableName, *, line_callback: Optional[Callable] = None, call_args: Optional[dict] = None):
tdenv.DEBUG0(
"Processing import file '{}' for table '{}'",
str(importPath), tableName
)
call_args = call_args or {}
if line_callback:
line_callback = partial_fn(line_callback, **call_args)

fkeySelectStr = (
"("
Expand Down Expand Up @@ -871,6 +877,8 @@ def processImportFile(tdenv, db, importPath, tableName):
uniqueIndex = {}

for linein in csvin:
if line_callback:
line_callback()
if not linein:
continue
lineNo = csvin.line_num
Expand Down Expand Up @@ -978,12 +986,15 @@ def buildCache(tdb, tdenv):
tempDB.executescript(sqlScript)

# import standard tables
with Progress(max_value=len(tdb.importTables) + 1, width=25, style=CountingBar) as prog:
for (importName, importTable) in tdb.importTables:
with prog.sub_task(description=importName, max_value=None):
prog.increment(value=1, description=importName)
with Progress(max_value=len(tdb.importTables) + 1, prefix="Importing", width=25, style=CountingBar) as prog:
for importName, importTable in tdb.importTables:
import_path = Path(importName)
import_lines = file_line_count(import_path, missing_ok=True)
with prog.sub_task(max_value=import_lines, description=importTable) as child:
prog.increment(value=1)
call_args = {'task': child, 'advance': 1}
try:
processImportFile(tdenv, tempDB, Path(importName), importTable)
processImportFile(tdenv, tempDB, import_path, importTable, line_callback=prog.update_task, call_args=call_args)
except FileNotFoundError:
tdenv.DEBUG0(
"WARNING: processImportFile found no {} file", importName
Expand All @@ -994,7 +1005,7 @@ def buildCache(tdb, tdenv):
"Remove it or add the column definition line.",
importName
)
prog.increment(1)
prog.increment(1)

with prog.sub_task(description="Save DB"):
tempDB.commit()
Expand Down
2 changes: 1 addition & 1 deletion tradedangerous/misc/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def __init__(self,
# What fields to display.
*style_instance.columns,
# Hide it once it's finished, update it for us, 4x a second
transient=True, auto_refresh=True, refresh_per_second=4
transient=True, auto_refresh=True, refresh_per_second=5
)

# Now we add an actual task to track progress on.
Expand Down
60 changes: 40 additions & 20 deletions tradedangerous/plugins/eddblink_plug.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,16 @@ def importListings(self, listings_file):
"""
listings_path = Path(self.dataPath, listings_file).absolute()
from_live = listings_path != Path(self.dataPath, self.listingsPath).absolute()
self.tdenv.NOTE("Processing market data from {}: Start time = {}. Live = {}", listings_file, self.now(), from_live)

self.tdenv.NOTE("Checking listings")
total = _count_listing_entries(self.tdenv, listings_path)
if not total:
self.tdenv.NOTE("No listings")
return

self.tdenv.NOTE("Processing market data from {}: Start time = {}. Live = {}", listings_file, self.now(), from_live)

db = self.tdb.getDB()
stmt_unliven_station = """UPDATE StationItem SET from_live = 0 WHERE station_id = ?"""
stmt_flush_station = """DELETE from StationItem WHERE station_id = ?"""
stmt_add_listing = """
Expand All @@ -204,20 +208,25 @@ def importListings(self, listings_file):
"""

# Fetch all the items IDS
db = self.tdb.getDB()
item_lookup = _make_item_id_lookup(self.tdenv, db.cursor())
station_lookup = _make_station_id_lookup(self.tdenv, db.cursor())
last_station_update_times = _collect_station_modified_times(self.tdenv, db.cursor())

cur_station = None
is_debug = self.tdenv.debug > 0
self.tdenv.DEBUG0("Processing entries...")
with listings_path.open("r", encoding="utf-8", errors="ignore") as fh:
prog = pbar.Progress(total, 50)

cursor: Optional[sqlite3.Cursor] = db.cursor()

# Try to find a balance between doing too many commits where we fail
# to get any benefits from constructing transactions, and blowing up
# the WAL and memory usage by making massive transactions.
max_transaction_items, transaction_items = 32 * 1024, 0
with (pbar.Progress(total, 40, prefix="Processing", style=pbar.LongRunningCountBar) as prog,
listings_path.open("r", encoding="utf-8", errors="ignore") as fh):
cursor = db.cursor()
cursor.execute("BEGIN TRANSACTION")

for listing in csv.DictReader(fh):
prog.increment(1, postfix = lambda value, total: f" {(value / total * 100):.0f}% {value} / {total}")
prog.increment(1)

station_id = int(listing['station_id'])
if station_id not in station_lookup:
Expand All @@ -227,16 +236,21 @@ def importListings(self, listings_file):

if station_id != cur_station:
# commit anything from the previous station, get a new cursor
db.commit()
cur_station, skip_station, cursor = station_id, False, db.cursor()
if transaction_items >= max_transaction_items:
cursor.execute("COMMIT")
transaction_items = 0
cursor.execute("BEGIN TRANSACTION")
cur_station, skip_station = station_id, False

# Check if listing already exists in DB and needs updated.
last_modified: int = int(last_station_update_times.get(station_id, 0))
if last_modified:
# When the listings.csv data matches the database, update to make from_live == 0.
if listing_time == last_modified and not from_live:
self.tdenv.DEBUG1(f"Marking {cur_station} as no longer 'live' (old={last_modified}, listing={listing_time}).")
if is_debug:
self.tdenv.DEBUG1(f"Marking {cur_station} as no longer 'live' (old={last_modified}, listing={listing_time}).")
cursor.execute(stmt_unliven_station, (cur_station,))
transaction_items += 1
skip_station = True
continue

Expand All @@ -247,8 +261,10 @@ def importListings(self, listings_file):
continue

# The data from the import file is newer, so we need to delete the old data for this station.
self.tdenv.DEBUG1(f"Deleting old listing data for {cur_station} (old={last_modified}, listing={listing_time}).")
if is_debug:
self.tdenv.DEBUG1(f"Deleting old listing data for {cur_station} (old={last_modified}, listing={listing_time}).")
cursor.execute(stmt_flush_station, (cur_station,))
transaction_items += 1
last_station_update_times[station_id] = listing_time

# station skip lasts until we change station id.
Expand All @@ -268,20 +284,24 @@ def importListings(self, listings_file):
supply_units = int(listing['supply'])
supply_level = int(listing.get('supply_bracket') or '-1')

self.tdenv.DEBUG1(f"Inserting new listing data for {station_id}.")
if is_debug:
self.tdenv.DEBUG1(f"Inserting new listing data for {station_id}.")
cursor.execute(stmt_add_listing, (
station_id, item_id, listing_time, from_live,
demand_price, demand_units, demand_level,
supply_price, supply_units, supply_level,
))

prog.clear()

# Do a final commit to be sure
db.commit()

self.tdenv.NOTE("Optimizing database...")
db.execute("VACUUM")
transaction_items += 1

# These will take a little while, which has four steps, so we'll make it a counter.
with pbar.Progress(1, 40, prefix="Saving"):
# Do a final commit to be sure
cursor.execute("COMMIT")

if self.getOption("optimize"):
with pbar.Progress(1, 40, prefix="Optimizing"):
db.execute("VACUUM")

self.tdb.close()

self.tdenv.NOTE("Finished processing market data. End time = {}", self.now())
Expand Down

0 comments on commit f3af244

Please sign in to comment.