Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions hathor/indexes/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from hathor.indexes.mempool_tips_index import MempoolTipsIndex
from hathor.indexes.nc_creation_index import NCCreationIndex
from hathor.indexes.nc_history_index import NCHistoryIndex
from hathor.indexes.scope import Scope
from hathor.indexes.timestamp_index import ScopeType as TimestampScopeType, TimestampIndex
from hathor.indexes.tokens_index import TokensIndex
from hathor.indexes.utxo_index import UtxoIndex
Expand Down Expand Up @@ -153,7 +154,8 @@ def _manually_initialize(self, tx_storage: 'TransactionStorage') -> None:
indexes_to_init.append(index)

if indexes_to_init:
self.log.info('there are indexes that need initialization', indexes_to_init=indexes_to_init)
indexes_names = [type(index).__name__ for index in indexes_to_init]
self.log.info('there are indexes that need initialization', indexes_to_init=indexes_names)
else:
self.log.info('there are no indexes that need initialization')

Expand All @@ -177,14 +179,20 @@ def _manually_initialize(self, tx_storage: 'TransactionStorage') -> None:
index.init_start(self)

if indexes_to_init:
overall_scope = reduce(operator.__or__, map(lambda i: i.get_scope(), indexes_to_init))
overall_scope: Scope = reduce(operator.__or__, map(lambda i: i.get_scope(), indexes_to_init))
tx_iter_inner = overall_scope.get_iterator(tx_storage)
tx_iter = tx_progress(tx_iter_inner, log=self.log, total=tx_storage.get_vertices_count())
tx_iter = tx_progress(
tx_iter_inner,
log=self.log,
total=tx_storage.get_vertices_count(),
show_height_and_ts=overall_scope.topological_order,
)
self.log.debug('indexes init', scope=overall_scope)
else:
tx_iter = iter([])
self.log.debug('indexes init')

self.log.info('initializing indexes...')
for tx in tx_iter:
# feed each transaction to the indexes that they are interested in
for index in indexes_to_init:
Expand Down
31 changes: 11 additions & 20 deletions hathor/indexes/scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,31 +61,22 @@ def matches(self, tx: BaseTransaction) -> bool:
return False
if not tx_meta.validation.is_fully_connected() and not self.include_partial:
return False
# XXX: self.topologial_order doesn't affect self.match()
# XXX: self.topological_order doesn't affect self.match()
# passed all checks
return True

def get_iterator(self, tx_storage: 'TransactionStorage') -> Iterator[BaseTransaction]:
""" This method returns an iterator that only yields transaction that match the current scope.
"""
iterator: Iterator[BaseTransaction]
# XXX: this is to mark if the chosen iterator will yield partial transactions
iterator_covers_partial: bool
if self.topological_order:
iterator = tx_storage.topological_iterator()
iterator_covers_partial = False
else:
iterator = tx_storage.get_all_transactions()
iterator_covers_partial = True
for tx in iterator:
if self.matches(tx):
yield tx
if self.include_partial and not iterator_covers_partial:
# if partial transactions are needed and were not already covered, we use get_all_transactions, which
# includes partial transactions, to yield them, skipping all that aren't partial
This method returns an iterator that yields alls transactions in respect to this Scope's ordering only,
disregarding whether the tx matches the Scope or not. It's the caller's responsibility to match them.
"""
if not self.topological_order:
yield from tx_storage.get_all_transactions()
return

yield from tx_storage.topological_iterator()
if self.include_partial:
for tx in tx_storage.get_all_transactions():
tx_meta = tx.get_metadata()
if tx_meta.validation.is_fully_connected():
continue
if self.matches(tx):
if not tx_meta.validation.is_fully_connected():
yield tx
36 changes: 26 additions & 10 deletions hathor/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ def skip_until(it: Iterator[T], condition: Callable[[T], bool]) -> Iterator[T]:


_DT_ITER_NEXT_WARN = 3 # time in seconds to warn when `next(iter_tx)` takes too long
_DT_LOG_PROGRESS = 30 # time in seconds after which a progress will be logged (it can take longer, but not shorter)
_DT_LOG_PROGRESS = 10 # time in seconds after which a progress will be logged (it can take longer, but not shorter)
_DT_YIELD_WARN = 1 # time in seconds to warn when `yield tx` takes too long (which is when processing happens)


Expand Down Expand Up @@ -455,18 +455,28 @@ def progress(
log.info('loaded', count=count, rate=rate, total_dt=dt_total)


def tx_progress(iter_tx: Iterator['BaseTransaction'], *, log: Optional['structlog.stdlib.BoundLogger'] = None,
total: Optional[int] = None) -> Iterator['BaseTransaction']:
def tx_progress(
iter_tx: Iterator['BaseTransaction'],
*,
log: Optional['structlog.stdlib.BoundLogger'] = None,
total: Optional[int] = None,
show_height_and_ts: bool = False,
) -> Iterator['BaseTransaction']:
""" Log the progress of a transaction iterator while iterating.
"""
if log is None:
log = logger.new()

yield from _tx_progress(iter_tx, log=log, total=total)
yield from _tx_progress(iter_tx, log=log, total=total, show_height_and_ts=show_height_and_ts)


def _tx_progress(iter_tx: Iterator['BaseTransaction'], *, log: 'structlog.stdlib.BoundLogger', total: Optional[int]
) -> Iterator['BaseTransaction']:
def _tx_progress(
iter_tx: Iterator['BaseTransaction'],
*,
log: 'structlog.stdlib.BoundLogger',
total: Optional[int],
show_height_and_ts: bool,
) -> Iterator['BaseTransaction']:
""" Inner implementation of progress helper.
"""
t_start = time.time()
Expand All @@ -477,6 +487,7 @@ def _tx_progress(iter_tx: Iterator['BaseTransaction'], *, log: 'structlog.stdlib
count_log_prev = 0
block_count = 0
tx_count = 0
first_log = True

log.debug('load will start')
t_log_prev = t_start
Expand All @@ -499,12 +510,15 @@ def _tx_progress(iter_tx: Iterator['BaseTransaction'], *, log: 'structlog.stdlib

t_log = time.time()
dt_log = LogDuration(t_log - t_log_prev)
if dt_log > _DT_LOG_PROGRESS:
if first_log or dt_log > _DT_LOG_PROGRESS:
first_log = False
t_log_prev = t_log
dcount = count - count_log_prev
tx_rate = '?' if dt_log == 0 else dcount / dt_log
ts = datetime.datetime.fromtimestamp(ts_tx)
kwargs = dict(tx_rate=tx_rate, tx_new=dcount, dt=dt_log, total=count, latest_ts=ts, height=h)
kwargs: dict[str, Any] = dict(tx_rate=tx_rate, tx_new=dcount, dt=dt_log, total=count)
if show_height_and_ts:
kwargs.update(latest_ts=ts, height=h)
if total:
progress_ = count / total
elapsed_time = t_log - t_start
Expand All @@ -515,7 +529,6 @@ def _tx_progress(iter_tx: Iterator['BaseTransaction'], *, log: 'structlog.stdlib
remaining_time = LogDuration(elapsed_time / progress_ - elapsed_time)
log.info(
f'loading... {math.floor(progress_ * 100):2.0f}%',
progress=progress_,
remaining_time=remaining_time,
**kwargs
)
Expand Down Expand Up @@ -544,7 +557,10 @@ def _tx_progress(iter_tx: Iterator['BaseTransaction'], *, log: 'structlog.stdlib
t_final = time.time()
dt_total = LogDuration(t_final - t_start)
tx_rate = '?' if dt_total == 0 else count / dt_total
log.info('loaded', tx_count=count, tx_rate=tx_rate, total_dt=dt_total, height=h, blocks=block_count, txs=tx_count)
kwargs = dict(tx_count=count, tx_rate=tx_rate, total_dt=dt_total, blocks=block_count, txs=tx_count)
if show_height_and_ts:
kwargs.update(height=h)
log.info('loaded', **kwargs)


class peekable(Iterator[T]):
Expand Down
Loading