Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions hathor/cli/db_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def iter_tx(self) -> Iterator['BaseTransaction']:
yield tx

def run(self) -> None:
from hathor.util import progress
from hathor.util import tx_progress
self.log.info('export')
self.out_file.write(MAGIC_HEADER)
tx_count = 0
Expand All @@ -108,7 +108,7 @@ def run(self) -> None:
# estimated total, this will obviously be wrong if we're not exporting everything, but it's still better than
# nothing, and it's probably better to finish sooner than expected, rather than later than expected
total = self.tx_storage.get_vertices_count()
for tx in progress(self.iter_tx(), log=self.log, total=total):
for tx in tx_progress(self.iter_tx(), log=self.log, total=total):
assert tx.hash is not None
tx_meta = tx.get_metadata()
if tx.is_block:
Expand Down
4 changes: 2 additions & 2 deletions hathor/cli/db_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def prepare(self, args: Namespace, *, register_resources: bool = True) -> None:
self.in_file = io.BufferedReader(args.import_file)

def run(self) -> None:
from hathor.util import progress
from hathor.util import tx_progress

header = self.in_file.read(len(MAGIC_HEADER))
if header != MAGIC_HEADER:
Expand All @@ -60,7 +60,7 @@ def run(self) -> None:
self.tx_storage.pre_init()
actual_tx_count = 0
actual_block_count = 0
for tx in progress(self._import_txs(), log=self.log, total=total):
for tx in tx_progress(self._import_txs(), log=self.log, total=total):
if tx.is_block:
actual_block_count += 1
else:
Expand Down
4 changes: 2 additions & 2 deletions hathor/indexes/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from hathor.indexes.tokens_index import TokensIndex
from hathor.indexes.utxo_index import UtxoIndex
from hathor.transaction import BaseTransaction
from hathor.util import progress
from hathor.util import tx_progress

if TYPE_CHECKING: # pragma: no cover
import rocksdb
Expand Down Expand Up @@ -173,7 +173,7 @@ def _manually_initialize(self, tx_storage: 'TransactionStorage') -> None:
if indexes_to_init:
overall_scope = reduce(operator.__or__, map(lambda i: i.get_scope(), indexes_to_init))
tx_iter_inner = overall_scope.get_iterator(tx_storage)
tx_iter = progress(tx_iter_inner, log=self.log, total=tx_storage.get_vertices_count())
tx_iter = tx_progress(tx_iter_inner, log=self.log, total=tx_storage.get_vertices_count())
self.log.debug('indexes init', scope=overall_scope)
else:
tx_iter = iter([])
Expand Down
53 changes: 1 addition & 52 deletions hathor/indexes/partial_rocksdb_tips_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@
# limitations under the License.

import math
import time
from typing import TYPE_CHECKING, Dict, Iterator, Optional, Union

import structlog
from intervaltree import Interval, IntervalTree
from structlog import get_logger

from hathor.indexes.memory_tips_index import MemoryTipsIndex
from hathor.indexes.rocksdb_utils import RocksDBIndexUtils
from hathor.indexes.tips_index import ScopeType
from hathor.util import LogDuration
from hathor.util import progress

if TYPE_CHECKING: # pragma: no cover
import rocksdb
Expand All @@ -49,55 +47,6 @@ def _from_db_value(i: int) -> Union[int, float]:
return i


def progress(iter_iv: Iterator[Interval], *, log: 'structlog.stdlib.BoundLogger', total: Optional[int],
) -> Iterator[Interval]:
""" Implementation of progress helper for using with loading the interval tree.

This is basically a stripped down version of `hathor.util.progress`
"""
t_start = time.time()
count = 0
count_log_prev = 0
if total is not None:
log.info('loading... 0%', progress=0)
else:
log.info('loading...')
t_log_prev = t_start
while True:
try:
iv = next(iter_iv)
except StopIteration:
break

t_log = time.time()
dt_log = LogDuration(t_log - t_log_prev)
if dt_log > _DT_LOG_PROGRESS:
t_log_prev = t_log
dcount = count - count_log_prev
rate = '?' if dt_log == 0 else dcount / dt_log
kwargs = dict(rate=rate, iv_new=dcount, dt=dt_log, total=count)
if total is not None:
progress = count / total
# TODO: we could add an ETA since we know the total
log.info(f'loading... {math.floor(progress * 100):2.0f}%', progress=progress, **kwargs)
else:
log.info('loading...', **kwargs)
count_log_prev = count
count += 1

yield iv

t_final = time.time()
dt_total = LogDuration(t_final - t_start)
rate = '?' if dt_total == 0 else count / dt_total
if total is not None:
progress = count / total
log.info(f'loaded... {math.floor(progress * 100):2.0f}%', progress=progress, count=count, rate=rate,
total_dt=dt_total)
else:
log.info('loaded', count=count, rate=rate, total_dt=dt_total)


class PartialRocksDBTipsIndex(MemoryTipsIndex, RocksDBIndexUtils):
""" Partial memory-rocksdb implementation

Expand Down
85 changes: 75 additions & 10 deletions hathor/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,19 +440,78 @@ def verified_cast(interface_class: Type[Z], obj: Any) -> Z:
_DT_YIELD_WARN = 1 # time in seconds to warn when `yield tx` takes too long (which is when processing happens)


def progress(iter_tx: Iterator['BaseTransaction'], *, log: Optional['structlog.stdlib.BoundLogger'] = None,
total: Optional[int] = None) -> Iterator['BaseTransaction']:
def progress(
it: Iterator[T],
*,
log: 'structlog.stdlib.BoundLogger',
total: Optional[int]
) -> Iterator[T]:
""" Implementation of progress helper for using with an iterator of any type.

This is basically a stripped down version of `hathor.util.progress`
"""
t_start = time.time()
count = 0
count_log_prev = 0
if total:
log.info('loading... 0%', progress=0)
else:
log.info('loading...')
t_log_prev = t_start
while True:
try:
item = next(it)
except StopIteration:
break

t_log = time.time()
dt_log = LogDuration(t_log - t_log_prev)
if dt_log > _DT_LOG_PROGRESS:
t_log_prev = t_log
dcount = count - count_log_prev
rate = '?' if dt_log == 0 else dcount / dt_log
kwargs = dict(rate=rate, new=dcount, dt=dt_log, total=count)
if total:
progress_ = count / total
elapsed_time = t_log - t_start
remaining_time = LogDuration(elapsed_time / progress_ - elapsed_time)
log.info(
f'loading... {math.floor(progress_ * 100):2.0f}%',
progress=progress_,
remaining_time=remaining_time,
**kwargs
)
else:
log.info('loading...', **kwargs)
count_log_prev = count
count += 1

yield item

t_final = time.time()
dt_total = LogDuration(t_final - t_start)
rate = '?' if dt_total == 0 else count / dt_total
if total:
progress_ = count / total
log.info(f'loaded... {math.floor(progress_ * 100):2.0f}%', progress=progress_, count=count, rate=rate,
total_dt=dt_total)
else:
log.info('loaded', count=count, rate=rate, total_dt=dt_total)


def tx_progress(iter_tx: Iterator['BaseTransaction'], *, log: Optional['structlog.stdlib.BoundLogger'] = None,
total: Optional[int] = None) -> Iterator['BaseTransaction']:
""" Log the progress of a transaction iterator while iterating.
"""
if log is None:
log = logger.new()

yield from _progress(iter_tx, log=log, total=total)
yield from _tx_progress(iter_tx, log=log, total=total)


def _progress(iter_tx: Iterator['BaseTransaction'], *, log: 'structlog.stdlib.BoundLogger', total: Optional[int]
) -> Iterator['BaseTransaction']:
""" Inner implementation of progress helper, it expects the gc to be disabled.
def _tx_progress(iter_tx: Iterator['BaseTransaction'], *, log: 'structlog.stdlib.BoundLogger', total: Optional[int]
) -> Iterator['BaseTransaction']:
""" Inner implementation of progress helper.
"""
t_start = time.time()
h = 0
Expand Down Expand Up @@ -489,10 +548,16 @@ def _progress(iter_tx: Iterator['BaseTransaction'], *, log: 'structlog.stdlib.Bo
tx_rate = '?' if dt_log == 0 else dcount / dt_log
ts = datetime.datetime.fromtimestamp(ts_tx)
kwargs = dict(tx_rate=tx_rate, tx_new=dcount, dt=dt_log, total=count, latest_ts=ts, height=h)
if total is not None:
progress = count / total
# TODO: we could add an ETA since we know the total
log.info(f'loading... {math.floor(progress * 100):2.0f}%', progress=progress, **kwargs)
if total:
progress_ = count / total
elapsed_time = t_log - t_start
remaining_time = LogDuration(elapsed_time / progress_ - elapsed_time)
log.info(
f'loading... {math.floor(progress_ * 100):2.0f}%',
progress=progress_,
remaining_time=remaining_time,
**kwargs
)
else:
log.info('loading...', **kwargs)
count_log_prev = count
Expand Down