From 982fccd19bde1d5db68c1656afc721acdfcb733f Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Mon, 5 Jun 2023 00:54:27 -0300 Subject: [PATCH 1/4] refactor: move progress function to utils --- hathor/indexes/partial_rocksdb_tips_index.py | 53 +------------------- hathor/util.py | 53 ++++++++++++++++++++ 2 files changed, 55 insertions(+), 51 deletions(-) diff --git a/hathor/indexes/partial_rocksdb_tips_index.py b/hathor/indexes/partial_rocksdb_tips_index.py index b41252d11..390db3f5b 100644 --- a/hathor/indexes/partial_rocksdb_tips_index.py +++ b/hathor/indexes/partial_rocksdb_tips_index.py @@ -23,7 +23,7 @@ from hathor.indexes.memory_tips_index import MemoryTipsIndex from hathor.indexes.rocksdb_utils import RocksDBIndexUtils from hathor.indexes.tips_index import ScopeType -from hathor.util import LogDuration +from hathor.util import LogDuration, generic_progress if TYPE_CHECKING: # pragma: no cover import rocksdb @@ -49,55 +49,6 @@ def _from_db_value(i: int) -> Union[int, float]: return i -def progress(iter_iv: Iterator[Interval], *, log: 'structlog.stdlib.BoundLogger', total: Optional[int], - ) -> Iterator[Interval]: - """ Implementation of progress helper for using with loading the interval tree. - - This is basically a stripped down version of `hathor.util.progress` - """ - t_start = time.time() - count = 0 - count_log_prev = 0 - if total is not None: - log.info('loading... 0%', progress=0) - else: - log.info('loading...') - t_log_prev = t_start - while True: - try: - iv = next(iter_iv) - except StopIteration: - break - - t_log = time.time() - dt_log = LogDuration(t_log - t_log_prev) - if dt_log > _DT_LOG_PROGRESS: - t_log_prev = t_log - dcount = count - count_log_prev - rate = '?' if dt_log == 0 else dcount / dt_log - kwargs = dict(rate=rate, iv_new=dcount, dt=dt_log, total=count) - if total is not None: - progress = count / total - # TODO: we could add an ETA since we know the total - log.info(f'loading... {math.floor(progress * 100):2.0f}%', progress=progress, **kwargs) - else: - log.info('loading...', **kwargs) - count_log_prev = count - count += 1 - - yield iv - - t_final = time.time() - dt_total = LogDuration(t_final - t_start) - rate = '?' if dt_total == 0 else count / dt_total - if total is not None: - progress = count / total - log.info(f'loaded... {math.floor(progress * 100):2.0f}%', progress=progress, count=count, rate=rate, - total_dt=dt_total) - else: - log.info('loaded', count=count, rate=rate, total_dt=dt_total) - - class PartialRocksDBTipsIndex(MemoryTipsIndex, RocksDBIndexUtils): """ Partial memory-rocksdb implementation @@ -152,7 +103,7 @@ def init_start(self, indexes_manager: 'IndexesManager') -> None: else: log.info('index not identified, skipping total count') total = None - for iv in progress(self._iter_intervals_db(), log=log, total=total): + for iv in generic_progress(self._iter_intervals_db(), log=log, total=total): self.tree.add(iv) self.tx_last_interval[iv.data] = iv diff --git a/hathor/util.py b/hathor/util.py index 22f3abf50..9b686f7c5 100644 --- a/hathor/util.py +++ b/hathor/util.py @@ -440,6 +440,59 @@ def verified_cast(interface_class: Type[Z], obj: Any) -> Z: _DT_YIELD_WARN = 1 # time in seconds to warn when `yield tx` takes too long (which is when processing happens) +def generic_progress( + it: Iterator[T], + *, + log: 'structlog.stdlib.BoundLogger', + total: Optional[int] +) -> Iterator[T]: + """ Implementation of progress helper for using with a generic type. + + This is basically a stripped down version of `hathor.util.progress` + """ + t_start = time.time() + count = 0 + count_log_prev = 0 + if total is not None: + log.info('loading... 0%', progress=0) + else: + log.info('loading...') + t_log_prev = t_start + while True: + try: + item = next(it) + except StopIteration: + break + + t_log = time.time() + dt_log = LogDuration(t_log - t_log_prev) + if dt_log > _DT_LOG_PROGRESS: + t_log_prev = t_log + dcount = count - count_log_prev + rate = '?' if dt_log == 0 else dcount / dt_log + kwargs = dict(rate=rate, new=dcount, dt=dt_log, total=count) + if total is not None: + progress_ = count / total + # TODO: we could add an ETA since we know the total + log.info(f'loading... {math.floor(progress_ * 100):2.0f}%', progress=progress_, **kwargs) + else: + log.info('loading...', **kwargs) + count_log_prev = count + count += 1 + + yield item + + t_final = time.time() + dt_total = LogDuration(t_final - t_start) + rate = '?' if dt_total == 0 else count / dt_total + if total is not None: + progress_ = count / total + log.info(f'loaded... {math.floor(progress_ * 100):2.0f}%', progress=progress_, count=count, rate=rate, + total_dt=dt_total) + else: + log.info('loaded', count=count, rate=rate, total_dt=dt_total) + + def progress(iter_tx: Iterator['BaseTransaction'], *, log: Optional['structlog.stdlib.BoundLogger'] = None, total: Optional[int] = None) -> Iterator['BaseTransaction']: """ Log the progress of a transaction iterator while iterating. From 3bde337cc4d94d9ee184f9d7de7775ae4219ef05 Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Mon, 5 Jun 2023 01:05:34 -0300 Subject: [PATCH 2/4] refactor: add ETA to progress --- hathor/indexes/partial_rocksdb_tips_index.py | 4 +--- hathor/util.py | 20 ++++++++++++++++---- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/hathor/indexes/partial_rocksdb_tips_index.py b/hathor/indexes/partial_rocksdb_tips_index.py index 390db3f5b..842bc1a08 100644 --- a/hathor/indexes/partial_rocksdb_tips_index.py +++ b/hathor/indexes/partial_rocksdb_tips_index.py @@ -13,17 +13,15 @@ # limitations under the License. import math -import time from typing import TYPE_CHECKING, Dict, Iterator, Optional, Union -import structlog from intervaltree import Interval, IntervalTree from structlog import get_logger from hathor.indexes.memory_tips_index import MemoryTipsIndex from hathor.indexes.rocksdb_utils import RocksDBIndexUtils from hathor.indexes.tips_index import ScopeType -from hathor.util import LogDuration, generic_progress +from hathor.util import generic_progress if TYPE_CHECKING: # pragma: no cover import rocksdb diff --git a/hathor/util.py b/hathor/util.py index 9b686f7c5..ee79d7051 100644 --- a/hathor/util.py +++ b/hathor/util.py @@ -473,8 +473,14 @@ def generic_progress( kwargs = dict(rate=rate, new=dcount, dt=dt_log, total=count) if total is not None: progress_ = count / total - # TODO: we could add an ETA since we know the total - log.info(f'loading... {math.floor(progress_ * 100):2.0f}%', progress=progress_, **kwargs) + elapsed_time = t_log - t_start + remaining_time = LogDuration(elapsed_time / progress_ - elapsed_time) + log.info( + f'loading... {math.floor(progress_ * 100):2.0f}%', + progress=progress_, + remaining_time=remaining_time, + **kwargs + ) else: log.info('loading...', **kwargs) count_log_prev = count @@ -544,8 +550,14 @@ def _progress(iter_tx: Iterator['BaseTransaction'], *, log: 'structlog.stdlib.Bo kwargs = dict(tx_rate=tx_rate, tx_new=dcount, dt=dt_log, total=count, latest_ts=ts, height=h) if total is not None: progress = count / total - # TODO: we could add an ETA since we know the total - log.info(f'loading... {math.floor(progress * 100):2.0f}%', progress=progress, **kwargs) + elapsed_time = t_log - t_start + remaining_time = LogDuration(elapsed_time / progress - elapsed_time) + log.info( + f'loading... {math.floor(progress * 100):2.0f}%', + progress=progress, + remaining_time=remaining_time, + **kwargs + ) else: log.info('loading...', **kwargs) count_log_prev = count From 69151c0f2afb4567354216633f06fa4dcd6cdd96 Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Mon, 5 Jun 2023 17:36:17 -0300 Subject: [PATCH 3/4] chore: change function names --- hathor/cli/db_export.py | 4 ++-- hathor/cli/db_import.py | 4 ++-- hathor/indexes/manager.py | 4 ++-- hathor/indexes/partial_rocksdb_tips_index.py | 4 ++-- hathor/util.py | 24 ++++++++++---------- 5 files changed, 20 insertions(+), 20 deletions(-) diff --git a/hathor/cli/db_export.py b/hathor/cli/db_export.py index 18e93bfcd..156069acf 100644 --- a/hathor/cli/db_export.py +++ b/hathor/cli/db_export.py @@ -95,7 +95,7 @@ def iter_tx(self) -> Iterator['BaseTransaction']: yield tx def run(self) -> None: - from hathor.util import progress + from hathor.util import tx_progress self.log.info('export') self.out_file.write(MAGIC_HEADER) tx_count = 0 @@ -108,7 +108,7 @@ def run(self) -> None: # estimated total, this will obviously be wrong if we're not exporting everything, but it's still better than # nothing, and it's probably better to finish sooner than expected, rather than later than expected total = self.tx_storage.get_vertices_count() - for tx in progress(self.iter_tx(), log=self.log, total=total): + for tx in tx_progress(self.iter_tx(), log=self.log, total=total): assert tx.hash is not None tx_meta = tx.get_metadata() if tx.is_block: diff --git a/hathor/cli/db_import.py b/hathor/cli/db_import.py index 1c581e3c4..f0057d0f2 100644 --- a/hathor/cli/db_import.py +++ b/hathor/cli/db_import.py @@ -46,7 +46,7 @@ def prepare(self, args: Namespace, *, register_resources: bool = True) -> None: self.in_file = io.BufferedReader(args.import_file) def run(self) -> None: - from hathor.util import progress + from hathor.util import tx_progress header = self.in_file.read(len(MAGIC_HEADER)) if header != MAGIC_HEADER: @@ -60,7 +60,7 @@ def run(self) -> None: self.tx_storage.pre_init() actual_tx_count = 0 actual_block_count = 0 - for tx in progress(self._import_txs(), log=self.log, total=total): + for tx in tx_progress(self._import_txs(), log=self.log, total=total): if tx.is_block: actual_block_count += 1 else: diff --git a/hathor/indexes/manager.py b/hathor/indexes/manager.py index 98aca2440..43c3552dc 100644 --- a/hathor/indexes/manager.py +++ b/hathor/indexes/manager.py @@ -30,7 +30,7 @@ from hathor.indexes.tokens_index import TokensIndex from hathor.indexes.utxo_index import UtxoIndex from hathor.transaction import BaseTransaction -from hathor.util import progress +from hathor.util import tx_progress if TYPE_CHECKING: # pragma: no cover import rocksdb @@ -173,7 +173,7 @@ def _manually_initialize(self, tx_storage: 'TransactionStorage') -> None: if indexes_to_init: overall_scope = reduce(operator.__or__, map(lambda i: i.get_scope(), indexes_to_init)) tx_iter_inner = overall_scope.get_iterator(tx_storage) - tx_iter = progress(tx_iter_inner, log=self.log, total=tx_storage.get_vertices_count()) + tx_iter = tx_progress(tx_iter_inner, log=self.log, total=tx_storage.get_vertices_count()) self.log.debug('indexes init', scope=overall_scope) else: tx_iter = iter([]) diff --git a/hathor/indexes/partial_rocksdb_tips_index.py b/hathor/indexes/partial_rocksdb_tips_index.py index 842bc1a08..8757705b1 100644 --- a/hathor/indexes/partial_rocksdb_tips_index.py +++ b/hathor/indexes/partial_rocksdb_tips_index.py @@ -21,7 +21,7 @@ from hathor.indexes.memory_tips_index import MemoryTipsIndex from hathor.indexes.rocksdb_utils import RocksDBIndexUtils from hathor.indexes.tips_index import ScopeType -from hathor.util import generic_progress +from hathor.util import progress if TYPE_CHECKING: # pragma: no cover import rocksdb @@ -101,7 +101,7 @@ def init_start(self, indexes_manager: 'IndexesManager') -> None: else: log.info('index not identified, skipping total count') total = None - for iv in generic_progress(self._iter_intervals_db(), log=log, total=total): + for iv in progress(self._iter_intervals_db(), log=log, total=total): self.tree.add(iv) self.tx_last_interval[iv.data] = iv diff --git a/hathor/util.py b/hathor/util.py index ee79d7051..c755992b1 100644 --- a/hathor/util.py +++ b/hathor/util.py @@ -440,13 +440,13 @@ def verified_cast(interface_class: Type[Z], obj: Any) -> Z: _DT_YIELD_WARN = 1 # time in seconds to warn when `yield tx` takes too long (which is when processing happens) -def generic_progress( +def progress( it: Iterator[T], *, log: 'structlog.stdlib.BoundLogger', total: Optional[int] ) -> Iterator[T]: - """ Implementation of progress helper for using with a generic type. + """ Implementation of progress helper for using with an iterator of any type. This is basically a stripped down version of `hathor.util.progress` """ @@ -499,19 +499,19 @@ def generic_progress( log.info('loaded', count=count, rate=rate, total_dt=dt_total) -def progress(iter_tx: Iterator['BaseTransaction'], *, log: Optional['structlog.stdlib.BoundLogger'] = None, - total: Optional[int] = None) -> Iterator['BaseTransaction']: +def tx_progress(iter_tx: Iterator['BaseTransaction'], *, log: Optional['structlog.stdlib.BoundLogger'] = None, + total: Optional[int] = None) -> Iterator['BaseTransaction']: """ Log the progress of a transaction iterator while iterating. """ if log is None: log = logger.new() - yield from _progress(iter_tx, log=log, total=total) + yield from _tx_progress(iter_tx, log=log, total=total) -def _progress(iter_tx: Iterator['BaseTransaction'], *, log: 'structlog.stdlib.BoundLogger', total: Optional[int] - ) -> Iterator['BaseTransaction']: - """ Inner implementation of progress helper, it expects the gc to be disabled. +def _tx_progress(iter_tx: Iterator['BaseTransaction'], *, log: 'structlog.stdlib.BoundLogger', total: Optional[int] + ) -> Iterator['BaseTransaction']: + """ Inner implementation of progress helper. """ t_start = time.time() h = 0 @@ -549,12 +549,12 @@ def _progress(iter_tx: Iterator['BaseTransaction'], *, log: 'structlog.stdlib.Bo ts = datetime.datetime.fromtimestamp(ts_tx) kwargs = dict(tx_rate=tx_rate, tx_new=dcount, dt=dt_log, total=count, latest_ts=ts, height=h) if total is not None: - progress = count / total + progress_ = count / total elapsed_time = t_log - t_start - remaining_time = LogDuration(elapsed_time / progress - elapsed_time) + remaining_time = LogDuration(elapsed_time / progress_ - elapsed_time) log.info( - f'loading... {math.floor(progress * 100):2.0f}%', - progress=progress, + f'loading... {math.floor(progress_ * 100):2.0f}%', + progress=progress_, remaining_time=remaining_time, **kwargs ) From 75219499212f10a5e591c40d52c881c0c409fec2 Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Wed, 7 Jun 2023 12:23:58 -0300 Subject: [PATCH 4/4] fix total divide by zero --- hathor/util.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hathor/util.py b/hathor/util.py index c755992b1..7f9ed0344 100644 --- a/hathor/util.py +++ b/hathor/util.py @@ -453,7 +453,7 @@ def progress( t_start = time.time() count = 0 count_log_prev = 0 - if total is not None: + if total: log.info('loading... 0%', progress=0) else: log.info('loading...') @@ -471,7 +471,7 @@ def progress( dcount = count - count_log_prev rate = '?' if dt_log == 0 else dcount / dt_log kwargs = dict(rate=rate, new=dcount, dt=dt_log, total=count) - if total is not None: + if total: progress_ = count / total elapsed_time = t_log - t_start remaining_time = LogDuration(elapsed_time / progress_ - elapsed_time) @@ -491,7 +491,7 @@ def progress( t_final = time.time() dt_total = LogDuration(t_final - t_start) rate = '?' if dt_total == 0 else count / dt_total - if total is not None: + if total: progress_ = count / total log.info(f'loaded... {math.floor(progress_ * 100):2.0f}%', progress=progress_, count=count, rate=rate, total_dt=dt_total) @@ -548,7 +548,7 @@ def _tx_progress(iter_tx: Iterator['BaseTransaction'], *, log: 'structlog.stdlib tx_rate = '?' if dt_log == 0 else dcount / dt_log ts = datetime.datetime.fromtimestamp(ts_tx) kwargs = dict(tx_rate=tx_rate, tx_new=dcount, dt=dt_log, total=count, latest_ts=ts, height=h) - if total is not None: + if total: progress_ = count / total elapsed_time = t_log - t_start remaining_time = LogDuration(elapsed_time / progress_ - elapsed_time)