Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,24 @@ pytest_flags = -p no:warnings --cov-report=term --cov-report=html --cov-report=x

.PHONY: tests-cli
tests-cli:
pytest -n auto --durations=10 --cov=hathor/cli/ --cov-config=.coveragerc_full --cov-fail-under=27 -p no:warnings $(tests_cli)
pytest --durations=10 --cov=hathor/cli/ --cov-config=.coveragerc_full --cov-fail-under=27 -p no:warnings $(tests_cli)

.PHONY: tests-doctests
tests-doctests:
pytest --durations=10 $(pytest_flags) --doctest-modules hathor

.PHONY: tests-lib
tests-lib:
pytest -n auto --durations=10 $(pytest_flags) --doctest-modules hathor $(tests_lib)
pytest --durations=10 $(pytest_flags) --doctest-modules hathor $(tests_lib)

.PHONY: tests-quick
tests-quick:
pytest --durations=10 $(pytest_flags) --doctest-modules hathor $(tests_lib) --maxfail=1 -m "not slow"

.PHONY: tests-genesis
tests-genesis:
HATHOR_TEST_CONFIG_FILE=hathor.conf.mainnet pytest -n auto tests/tx/test_genesis.py
HATHOR_TEST_CONFIG_FILE=hathor.conf.testnet pytest -n auto tests/tx/test_genesis.py
HATHOR_TEST_CONFIG_FILE=hathor.conf.mainnet pytest tests/tx/test_genesis.py
HATHOR_TEST_CONFIG_FILE=hathor.conf.testnet pytest tests/tx/test_genesis.py

.PHONY: tests
tests: tests-cli tests-lib tests-genesis
Expand Down
10 changes: 10 additions & 0 deletions hathor/indexes/address_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from structlog import get_logger

from hathor.indexes.scope import Scope
from hathor.indexes.tx_group_index import TxGroupIndex
from hathor.pubsub import HathorEvents
from hathor.transaction import BaseTransaction
Expand All @@ -26,12 +27,21 @@

logger = get_logger()

SCOPE = Scope(
include_blocks=True,
include_txs=True,
include_voided=True,
)


class AddressIndex(TxGroupIndex[str]):
""" Index of inputs/outputs by address
"""
pubsub: Optional['PubSubManager']

def get_scope(self) -> Scope:
return SCOPE

def init_loop_step(self, tx: BaseTransaction) -> None:
self.add_tx(tx)

Expand Down
6 changes: 6 additions & 0 deletions hathor/indexes/base_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Optional

from hathor.indexes.scope import Scope
from hathor.transaction.base_transaction import BaseTransaction

if TYPE_CHECKING: # pragma: no cover
Expand All @@ -35,6 +36,11 @@ def init_start(self, indexes_manager: 'IndexesManager') -> None:
"""
pass

@abstractmethod
def get_scope(self) -> Scope:
""" Returns the scope of interest of this index, whether the scope is configurable is up to the index."""
raise NotImplementedError

@abstractmethod
def get_db_name(self) -> Optional[str]:
""" The returned string is used to generate the relevant attributes for storing an indexe's state in the db.
Expand Down
11 changes: 11 additions & 0 deletions hathor/indexes/deps_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import TYPE_CHECKING, Iterator, List

from hathor.indexes.base_index import BaseIndex
from hathor.indexes.scope import Scope
from hathor.transaction import BaseTransaction, Block

if TYPE_CHECKING: # pragma: no cover
Expand All @@ -25,6 +26,13 @@
# XXX: this arbitrary height limit must fit in a u32 (4-bytes unsigned), so it can be stored easily on rocksdb
INF_HEIGHT: int = 2**32 - 1

SCOPE = Scope(
include_blocks=True,
include_txs=True,
include_voided=True,
include_partial=True
)


def get_requested_from_height(tx: BaseTransaction) -> int:
"""Return the height of the block that requested (directly or indirectly) the download of this transaction.
Expand Down Expand Up @@ -105,6 +113,9 @@ class DepsIndex(BaseIndex):
them.
"""

def get_scope(self) -> Scope:
return SCOPE

def init_loop_step(self, tx: BaseTransaction) -> None:
tx_meta = tx.get_metadata()
if tx_meta.voided_by:
Expand Down
10 changes: 10 additions & 0 deletions hathor/indexes/height_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,17 @@
from typing import List, NamedTuple, Optional, Tuple

from hathor.indexes.base_index import BaseIndex
from hathor.indexes.scope import Scope
from hathor.transaction import BaseTransaction, Block
from hathor.transaction.genesis import BLOCK_GENESIS
from hathor.util import not_none

SCOPE = Scope(
include_blocks=True,
include_txs=False,
include_voided=True,
)


class IndexEntry(NamedTuple):
"""Helper named tuple that implementations can use."""
Expand All @@ -40,6 +47,9 @@ class HeightIndex(BaseIndex):
"""Store the block hash for each given height
"""

def get_scope(self) -> Scope:
return SCOPE

def init_loop_step(self, tx: BaseTransaction) -> None:
if not tx.is_block:
return
Expand Down
12 changes: 12 additions & 0 deletions hathor/indexes/info_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@
from structlog import get_logger

from hathor.indexes.base_index import BaseIndex
from hathor.indexes.scope import Scope
from hathor.transaction import BaseTransaction

logger = get_logger()

SCOPE = Scope(
include_blocks=True,
include_txs=True,
include_voided=True,
# XXX: this index doesn't care about the ordering
topological_order=False,
)


class InfoIndex(BaseIndex):
""" Index of general information about the storage
Expand All @@ -30,6 +39,9 @@ def init_loop_step(self, tx: BaseTransaction) -> None:
self.update_timestamps(tx)
self.update_counts(tx)

def get_scope(self) -> Scope:
return SCOPE

@abstractmethod
def update_timestamps(self, tx: BaseTransaction) -> None:
raise NotImplementedError
Expand Down
113 changes: 46 additions & 67 deletions hathor/indexes/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import operator
from abc import ABC, abstractmethod
from enum import Enum, auto
from typing import TYPE_CHECKING, Iterator, List, Optional, Tuple
from functools import reduce
from typing import TYPE_CHECKING, Iterator, List, Optional

from structlog import get_logger

Expand All @@ -24,8 +25,8 @@
from hathor.indexes.height_index import HeightIndex
from hathor.indexes.info_index import InfoIndex
from hathor.indexes.mempool_tips_index import MempoolTipsIndex
from hathor.indexes.timestamp_index import TimestampIndex
from hathor.indexes.tips_index import TipsIndex
from hathor.indexes.timestamp_index import ScopeType as TimestampScopeType, TimestampIndex
from hathor.indexes.tips_index import ScopeType as TipsScopeType, TipsIndex
from hathor.indexes.tokens_index import TokensIndex
from hathor.indexes.utxo_index import UtxoIndex
from hathor.transaction import BaseTransaction
Expand All @@ -42,12 +43,6 @@
MAX_CACHE_SIZE_DURING_LOAD = 1000


class _IndexFilter(Enum):
ALL = auto() # block or tx, voided or not
ALL_BLOCKS = auto() # only blocks that are not voided
VALID_TXS = auto() # only transactions that are not voided


class IndexesManager(ABC):
""" IndexesManager manages all the indexes that we will have in the system

Expand Down Expand Up @@ -87,29 +82,21 @@ def __init_checks__(self):

def iter_all_indexes(self) -> Iterator[BaseIndex]:
""" Iterate over all of the indexes abstracted by this manager, hiding their specific implementation details"""
for _, index in self._iter_all_indexes_with_filter():
yield index

def _iter_all_indexes_with_filter(self) -> Iterator[Tuple[_IndexFilter, BaseIndex]]:
""" Same as `iter_all_indexes()`, but includes a filter for what transactions an index is interested in."""
yield _IndexFilter.ALL, self.info
yield _IndexFilter.ALL, self.all_tips
yield _IndexFilter.ALL_BLOCKS, self.block_tips
yield _IndexFilter.VALID_TXS, self.tx_tips
yield _IndexFilter.ALL, self.sorted_all
yield _IndexFilter.ALL_BLOCKS, self.sorted_blocks
yield _IndexFilter.VALID_TXS, self.sorted_txs
yield _IndexFilter.ALL, self.height
if self.deps is not None:
yield _IndexFilter.ALL, self.deps
if self.mempool_tips is not None:
yield _IndexFilter.ALL, self.mempool_tips
if self.addresses is not None:
yield _IndexFilter.ALL, self.addresses
if self.tokens is not None:
yield _IndexFilter.ALL, self.tokens
if self.utxo is not None:
yield _IndexFilter.ALL, self.utxo
return filter(None, [
self.info,
self.all_tips,
self.block_tips,
self.tx_tips,
self.sorted_all,
self.sorted_blocks,
self.sorted_txs,
self.height,
self.deps,
self.mempool_tips,
self.addresses,
self.tokens,
self.utxo,
])

@abstractmethod
def enable_address_index(self, pubsub: 'PubSubManager') -> None:
Expand Down Expand Up @@ -149,24 +136,23 @@ def _manually_initialize(self, tx_storage: 'TransactionStorage') -> None:

db_last_started_at = tx_storage.get_last_started_at()

indexes_to_init: List[Tuple[_IndexFilter, BaseIndex]] = []
for index_filter, index in self._iter_all_indexes_with_filter():
indexes_to_init: List[BaseIndex] = []
for index in self.iter_all_indexes():
index_db_name = index.get_db_name()
if index_db_name is None:
indexes_to_init.append((index_filter, index))
indexes_to_init.append(index)
continue
index_last_started_at = tx_storage.get_index_last_started_at(index_db_name)
if db_last_started_at != index_last_started_at:
indexes_to_init.append((index_filter, index))
indexes_to_init.append(index)

if indexes_to_init:
self.log.info('there are indexes that need initialization',
indexes_to_init=[i for _, i in indexes_to_init])
self.log.info('there are indexes that need initialization', indexes_to_init=indexes_to_init)
else:
self.log.info('there are no indexes that need initialization')

# make sure that all the indexes that we're rebuilding are cleared
for _, index in indexes_to_init:
for index in indexes_to_init:
index_db_name = index.get_db_name()
if index_db_name:
tx_storage.set_index_last_started_at(index_db_name, NULL_INDEX_LAST_STARTED_AT)
Expand All @@ -184,27 +170,20 @@ def _manually_initialize(self, tx_storage: 'TransactionStorage') -> None:
for index in self.iter_all_indexes():
index.init_start(self)

self.log.debug('indexes init')
if indexes_to_init:
tx_iter = progress(tx_storage.topological_iterator(), log=self.log, total=tx_storage.get_vertices_count())
overall_scope = reduce(operator.__or__, map(lambda i: i.get_scope(), indexes_to_init))
tx_iter_inner = overall_scope.get_iterator(tx_storage)
tx_iter = progress(tx_iter_inner, log=self.log, total=tx_storage.get_vertices_count())
self.log.debug('indexes init', scope=overall_scope)
else:
tx_iter = iter([])
for tx in tx_iter:
self.log.debug('indexes init')

tx_meta = tx.get_metadata()
for tx in tx_iter:
# feed each transaction to the indexes that they are interested in
for index_filter, index in indexes_to_init:
if index_filter is _IndexFilter.ALL:
for index in indexes_to_init:
if index.get_scope().matches(tx):
index.init_loop_step(tx)
elif index_filter is _IndexFilter.ALL_BLOCKS:
if tx.is_block:
index.init_loop_step(tx)
elif index_filter is _IndexFilter.VALID_TXS:
# XXX: all indexes that use this filter treat soft-voided as voided, nothing special needed
if tx.is_transaction and not tx_meta.voided_by:
index.init_loop_step(tx)
else:
assert False, 'impossible filter'

# Restore cache capacity.
if isinstance(tx_storage, TransactionCacheStorage):
Expand Down Expand Up @@ -307,13 +286,13 @@ def __init__(self) -> None:
from hathor.indexes.memory_tips_index import MemoryTipsIndex

self.info = MemoryInfoIndex()
self.all_tips = MemoryTipsIndex()
self.block_tips = MemoryTipsIndex()
self.tx_tips = MemoryTipsIndex()
self.all_tips = MemoryTipsIndex(scope_type=TipsScopeType.ALL)
self.block_tips = MemoryTipsIndex(scope_type=TipsScopeType.BLOCKS)
self.tx_tips = MemoryTipsIndex(scope_type=TipsScopeType.TXS)

self.sorted_all = MemoryTimestampIndex()
self.sorted_blocks = MemoryTimestampIndex()
self.sorted_txs = MemoryTimestampIndex()
self.sorted_all = MemoryTimestampIndex(scope_type=TimestampScopeType.ALL)
self.sorted_blocks = MemoryTimestampIndex(scope_type=TimestampScopeType.BLOCKS)
self.sorted_txs = MemoryTimestampIndex(scope_type=TimestampScopeType.TXS)

self.addresses = None
self.tokens = None
Expand Down Expand Up @@ -362,13 +341,13 @@ def __init__(self, db: 'rocksdb.DB') -> None:

self.info = RocksDBInfoIndex(self._db)
self.height = RocksDBHeightIndex(self._db)
self.all_tips = PartialRocksDBTipsIndex(self._db, 'all')
self.block_tips = PartialRocksDBTipsIndex(self._db, 'blocks')
self.tx_tips = PartialRocksDBTipsIndex(self._db, 'txs')
self.all_tips = PartialRocksDBTipsIndex(self._db, scope_type=TipsScopeType.ALL)
self.block_tips = PartialRocksDBTipsIndex(self._db, scope_type=TipsScopeType.BLOCKS)
self.tx_tips = PartialRocksDBTipsIndex(self._db, scope_type=TipsScopeType.TXS)

self.sorted_all = RocksDBTimestampIndex(self._db, 'all')
self.sorted_blocks = RocksDBTimestampIndex(self._db, 'blocks')
self.sorted_txs = RocksDBTimestampIndex(self._db, 'txs')
self.sorted_all = RocksDBTimestampIndex(self._db, scope_type=TimestampScopeType.ALL)
self.sorted_blocks = RocksDBTimestampIndex(self._db, scope_type=TimestampScopeType.BLOCKS)
self.sorted_txs = RocksDBTimestampIndex(self._db, scope_type=TimestampScopeType.TXS)

self.addresses = None
self.tokens = None
Expand Down
5 changes: 3 additions & 2 deletions hathor/indexes/memory_timestamp_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from sortedcontainers import SortedKeyList
from structlog import get_logger

from hathor.indexes.timestamp_index import RangeIdx, TimestampIndex
from hathor.indexes.timestamp_index import RangeIdx, ScopeType, TimestampIndex
from hathor.indexes.utils import (
TransactionIndexElement,
get_newer_sorted_key_list,
Expand All @@ -35,7 +35,8 @@ class MemoryTimestampIndex(TimestampIndex):

_index: 'SortedKeyList[TransactionIndexElement]'

def __init__(self) -> None:
def __init__(self, *, scope_type: ScopeType):
super().__init__(scope_type=scope_type)
self.log = logger.new()
self.force_clear()

Expand Down
5 changes: 3 additions & 2 deletions hathor/indexes/memory_tips_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from intervaltree import Interval, IntervalTree
from structlog import get_logger

from hathor.indexes.tips_index import TipsIndex
from hathor.indexes.tips_index import ScopeType, TipsIndex
from hathor.transaction import BaseTransaction

logger = get_logger()
Expand Down Expand Up @@ -47,7 +47,8 @@ class MemoryTipsIndex(TipsIndex):
# It is useful because the interval tree allows access only by the interval.
tx_last_interval: Dict[bytes, Interval]

def __init__(self) -> None:
def __init__(self, *, scope_type: ScopeType):
super().__init__(scope_type=scope_type)
self.log = logger.new()
self.tree = IntervalTree()
self.tx_last_interval = {}
Expand Down
Loading