From 4ed063fd0a238c5dcca042cd42a5fd84c1f4b8bc Mon Sep 17 00:00:00 2001 From: Jan Segre Date: Mon, 6 Mar 2023 15:18:59 -0300 Subject: [PATCH 1/3] chore(tests): move `-n auto` to pyproject.toml --- Makefile | 8 ++++---- pyproject.toml | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index a57c7a7e8..117b8b5b5 100644 --- a/Makefile +++ b/Makefile @@ -25,7 +25,7 @@ pytest_flags = -p no:warnings --cov-report=term --cov-report=html --cov-report=x .PHONY: tests-cli tests-cli: - pytest -n auto --durations=10 --cov=hathor/cli/ --cov-config=.coveragerc_full --cov-fail-under=27 -p no:warnings $(tests_cli) + pytest --durations=10 --cov=hathor/cli/ --cov-config=.coveragerc_full --cov-fail-under=27 -p no:warnings $(tests_cli) .PHONY: tests-doctests tests-doctests: @@ -33,7 +33,7 @@ tests-doctests: .PHONY: tests-lib tests-lib: - pytest -n auto --durations=10 $(pytest_flags) --doctest-modules hathor $(tests_lib) + pytest --durations=10 $(pytest_flags) --doctest-modules hathor $(tests_lib) .PHONY: tests-quick tests-quick: @@ -41,8 +41,8 @@ tests-quick: .PHONY: tests-genesis tests-genesis: - HATHOR_TEST_CONFIG_FILE=hathor.conf.mainnet pytest -n auto tests/tx/test_genesis.py - HATHOR_TEST_CONFIG_FILE=hathor.conf.testnet pytest -n auto tests/tx/test_genesis.py + HATHOR_TEST_CONFIG_FILE=hathor.conf.mainnet pytest tests/tx/test_genesis.py + HATHOR_TEST_CONFIG_FILE=hathor.conf.testnet pytest tests/tx/test_genesis.py .PHONY: tests tests: tests-cli tests-lib tests-genesis diff --git a/pyproject.toml b/pyproject.toml index 18e3b023d..c23f967c3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -134,6 +134,7 @@ warn_untyped_fields = true [tool.pytest.ini_options] minversion = "6.0" testpaths = ["tests"] +addopts = "-n auto" markers = [ "slow", ] From 274b275da61af75723b30653e0c6cb32711940af Mon Sep 17 00:00:00 2001 From: Jan Segre Date: Fri, 3 Mar 2023 13:06:44 -0300 Subject: [PATCH 2/3] refactor(indexes): move interest scope to inside the index --- hathor/indexes/address_index.py | 10 ++ hathor/indexes/base_index.py | 77 ++++++++++++- hathor/indexes/deps_index.py | 12 ++- hathor/indexes/height_index.py | 11 +- hathor/indexes/info_index.py | 13 ++- hathor/indexes/manager.py | 102 ++++++++---------- hathor/indexes/memory_timestamp_index.py | 3 +- hathor/indexes/memory_tips_index.py | 3 +- hathor/indexes/mempool_tips_index.py | 11 +- hathor/indexes/partial_rocksdb_tips_index.py | 11 +- hathor/indexes/rocksdb_timestamp_index.py | 10 +- hathor/indexes/timestamp_index.py | 34 +++++- hathor/indexes/tips_index.py | 34 +++++- hathor/indexes/tokens_index.py | 11 +- hathor/indexes/utxo_index.py | 11 +- hathor/transaction/storage/memory_storage.py | 2 +- hathor/transaction/storage/rocksdb_storage.py | 8 +- .../storage/transaction_storage.py | 4 +- tests/others/test_init_manager.py | 4 +- tests/tx/test_indexes2.py | 4 +- tests/tx/test_stratum.py | 3 +- 21 files changed, 295 insertions(+), 83 deletions(-) diff --git a/hathor/indexes/address_index.py b/hathor/indexes/address_index.py index b8925b927..fca7ce4ca 100644 --- a/hathor/indexes/address_index.py +++ b/hathor/indexes/address_index.py @@ -17,6 +17,7 @@ from structlog import get_logger +from hathor.indexes.base_index import Scope from hathor.indexes.tx_group_index import TxGroupIndex from hathor.pubsub import HathorEvents from hathor.transaction import BaseTransaction @@ -26,12 +27,21 @@ logger = get_logger() +SCOPE = Scope( + include_blocks=True, + include_txs=True, + include_voided=True, +) + class AddressIndex(TxGroupIndex[str]): """ Index of inputs/outputs by address """ pubsub: Optional['PubSubManager'] + def get_scope(self) -> Scope: + return SCOPE + def init_loop_step(self, tx: BaseTransaction) -> None: self.add_tx(tx) diff --git a/hathor/indexes/base_index.py b/hathor/indexes/base_index.py index 9503c673f..1edd60dc1 100644 --- a/hathor/indexes/base_index.py +++ b/hathor/indexes/base_index.py @@ -13,12 +13,82 @@ # limitations under the License. from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Iterator, NamedTuple, Optional from hathor.transaction.base_transaction import BaseTransaction if TYPE_CHECKING: # pragma: no cover from hathor.indexes.manager import IndexesManager + from hathor.transaction.storage import TransactionStorage + + +class Scope(NamedTuple): + """ This class models the scope of transactions that an index is interested in. + + It is used for both selecting the optimal iterator for all the indexes that need to be initialized and for + filtering which transactions are fed to the index. + """ + include_blocks: bool + include_txs: bool + include_voided: bool + # XXX: these have a default value since it should be really rare to have it different + include_partial: bool = False + topological_order: bool = True # if False than ordering doesn't matter + + # XXX: this is used to join the scope of multiple indexes to get an overall scope that includes everything that + # each individual scope needs, the OR operator was chosen because it represents well the operation of keeping + # a property if either A or B needs it + def __or__(self, other): + # XXX: note that this doesn't necessarily have to be OR operations between properties, we want the operations + # that broaden the scope, and not narrow it. + # XXX: in the case of topological_order, we want to keep the "topological" ordering if any of them requires it, + # so it also is an OR operator + return Scope( + include_blocks=self.include_blocks | other.include_blocks, + include_txs=self.include_txs | other.include_txs, + include_voided=self.include_voided | other.include_voided, + include_partial=self.include_partial | other.include_partial, + topological_order=self.topological_order | other.topological_order, + ) + + def matches(self, tx: BaseTransaction) -> bool: + """ Check if a transaction matches this scope, True means the index is interested in this transaction. + """ + if tx.is_block and not self.include_blocks: + return False + if tx.is_transaction and not self.include_txs: + return False + tx_meta = tx.get_metadata() + if tx_meta.voided_by and not self.include_voided: + return False + if not tx_meta.validation.is_fully_connected() and not self.include_partial: + return False + # XXX: self.topologial_order doesn't affect self.match() + # passed all checks + return True + + def get_iterator(self, tx_storage: 'TransactionStorage') -> Iterator[BaseTransaction]: + iterator: Iterator[BaseTransaction] + # XXX: this is to mark if the chosen iterator will yield partial transactions + iterator_covers_partial: bool + if self.topological_order: + iterator = tx_storage.topological_iterator() + iterator_covers_partial = False + else: + iterator = tx_storage.get_all_transactions() + iterator_covers_partial = True + for tx in iterator: + if self.matches(tx): + yield tx + if self.include_partial and not iterator_covers_partial: + # if partial transactions are needed and were not already covered, we use get_all_transactions, which + # includes partial transactions, to yield them, skipping all that aren't partial + for tx in tx_storage.get_all_transactions(): + tx_meta = tx.get_metadata() + if tx_meta.validation.is_fully_connected(): + continue + if self.matches(tx): + yield tx class BaseIndex(ABC): @@ -35,6 +105,11 @@ def init_start(self, indexes_manager: 'IndexesManager') -> None: """ pass + @abstractmethod + def get_scope(self) -> Scope: + """ Returns the scope of interest of this index, whether the scope is configurable is up to the index.""" + raise NotImplementedError + @abstractmethod def get_db_name(self) -> Optional[str]: """ The returned string is used to generate the relevant attributes for storing an indexe's state in the db. diff --git a/hathor/indexes/deps_index.py b/hathor/indexes/deps_index.py index 0b7d07276..f4529c7e3 100644 --- a/hathor/indexes/deps_index.py +++ b/hathor/indexes/deps_index.py @@ -15,7 +15,7 @@ from abc import abstractmethod from typing import TYPE_CHECKING, Iterator, List -from hathor.indexes.base_index import BaseIndex +from hathor.indexes.base_index import BaseIndex, Scope from hathor.transaction import BaseTransaction, Block if TYPE_CHECKING: # pragma: no cover @@ -25,6 +25,13 @@ # XXX: this arbitrary height limit must fit in a u32 (4-bytes unsigned), so it can be stored easily on rocksdb INF_HEIGHT: int = 2**32 - 1 +SCOPE = Scope( + include_blocks=True, + include_txs=True, + include_voided=True, + include_partial=True +) + def get_requested_from_height(tx: BaseTransaction) -> int: """Return the height of the block that requested (directly or indirectly) the download of this transaction. @@ -105,6 +112,9 @@ class DepsIndex(BaseIndex): them. """ + def get_scope(self) -> Scope: + return SCOPE + def init_loop_step(self, tx: BaseTransaction) -> None: tx_meta = tx.get_metadata() if tx_meta.voided_by: diff --git a/hathor/indexes/height_index.py b/hathor/indexes/height_index.py index 655fe7e70..fd591951e 100644 --- a/hathor/indexes/height_index.py +++ b/hathor/indexes/height_index.py @@ -15,11 +15,17 @@ from abc import abstractmethod from typing import List, NamedTuple, Optional, Tuple -from hathor.indexes.base_index import BaseIndex +from hathor.indexes.base_index import BaseIndex, Scope from hathor.transaction import BaseTransaction, Block from hathor.transaction.genesis import BLOCK_GENESIS from hathor.util import not_none +SCOPE = Scope( + include_blocks=True, + include_txs=False, + include_voided=True, +) + class IndexEntry(NamedTuple): """Helper named tuple that implementations can use.""" @@ -40,6 +46,9 @@ class HeightIndex(BaseIndex): """Store the block hash for each given height """ + def get_scope(self) -> Scope: + return SCOPE + def init_loop_step(self, tx: BaseTransaction) -> None: if not tx.is_block: return diff --git a/hathor/indexes/info_index.py b/hathor/indexes/info_index.py index 96c200eed..6da5345e9 100644 --- a/hathor/indexes/info_index.py +++ b/hathor/indexes/info_index.py @@ -16,11 +16,19 @@ from structlog import get_logger -from hathor.indexes.base_index import BaseIndex +from hathor.indexes.base_index import BaseIndex, Scope from hathor.transaction import BaseTransaction logger = get_logger() +SCOPE = Scope( + include_blocks=True, + include_txs=True, + include_voided=True, + # XXX: this index doesn't care about the ordering + topological_order=False, +) + class InfoIndex(BaseIndex): """ Index of general information about the storage @@ -30,6 +38,9 @@ def init_loop_step(self, tx: BaseTransaction) -> None: self.update_timestamps(tx) self.update_counts(tx) + def get_scope(self) -> Scope: + return SCOPE + @abstractmethod def update_timestamps(self, tx: BaseTransaction) -> None: raise NotImplementedError diff --git a/hathor/indexes/manager.py b/hathor/indexes/manager.py index bb4ccfe37..50fe42753 100644 --- a/hathor/indexes/manager.py +++ b/hathor/indexes/manager.py @@ -12,9 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import operator from abc import ABC, abstractmethod -from enum import Enum, auto -from typing import TYPE_CHECKING, Iterator, List, Optional, Tuple +from functools import reduce +from typing import TYPE_CHECKING, Iterator, List, Optional from structlog import get_logger @@ -42,12 +43,6 @@ MAX_CACHE_SIZE_DURING_LOAD = 1000 -class _IndexFilter(Enum): - ALL = auto() # block or tx, voided or not - ALL_BLOCKS = auto() # only blocks that are not voided - VALID_TXS = auto() # only transactions that are not voided - - class IndexesManager(ABC): """ IndexesManager manages all the indexes that we will have in the system @@ -87,29 +82,24 @@ def __init_checks__(self): def iter_all_indexes(self) -> Iterator[BaseIndex]: """ Iterate over all of the indexes abstracted by this manager, hiding their specific implementation details""" - for _, index in self._iter_all_indexes_with_filter(): - yield index - - def _iter_all_indexes_with_filter(self) -> Iterator[Tuple[_IndexFilter, BaseIndex]]: - """ Same as `iter_all_indexes()`, but includes a filter for what transactions an index is interested in.""" - yield _IndexFilter.ALL, self.info - yield _IndexFilter.ALL, self.all_tips - yield _IndexFilter.ALL_BLOCKS, self.block_tips - yield _IndexFilter.VALID_TXS, self.tx_tips - yield _IndexFilter.ALL, self.sorted_all - yield _IndexFilter.ALL_BLOCKS, self.sorted_blocks - yield _IndexFilter.VALID_TXS, self.sorted_txs - yield _IndexFilter.ALL, self.height + yield self.info + yield self.all_tips + yield self.block_tips + yield self.tx_tips + yield self.sorted_all + yield self.sorted_blocks + yield self.sorted_txs + yield self.height if self.deps is not None: - yield _IndexFilter.ALL, self.deps + yield self.deps if self.mempool_tips is not None: - yield _IndexFilter.ALL, self.mempool_tips + yield self.mempool_tips if self.addresses is not None: - yield _IndexFilter.ALL, self.addresses + yield self.addresses if self.tokens is not None: - yield _IndexFilter.ALL, self.tokens + yield self.tokens if self.utxo is not None: - yield _IndexFilter.ALL, self.utxo + yield self.utxo @abstractmethod def enable_address_index(self, pubsub: 'PubSubManager') -> None: @@ -149,24 +139,23 @@ def _manually_initialize(self, tx_storage: 'TransactionStorage') -> None: db_last_started_at = tx_storage.get_last_started_at() - indexes_to_init: List[Tuple[_IndexFilter, BaseIndex]] = [] - for index_filter, index in self._iter_all_indexes_with_filter(): + indexes_to_init: List[BaseIndex] = [] + for index in self.iter_all_indexes(): index_db_name = index.get_db_name() if index_db_name is None: - indexes_to_init.append((index_filter, index)) + indexes_to_init.append(index) continue index_last_started_at = tx_storage.get_index_last_started_at(index_db_name) if db_last_started_at != index_last_started_at: - indexes_to_init.append((index_filter, index)) + indexes_to_init.append(index) if indexes_to_init: - self.log.info('there are indexes that need initialization', - indexes_to_init=[i for _, i in indexes_to_init]) + self.log.info('there are indexes that need initialization', indexes_to_init=indexes_to_init) else: self.log.info('there are no indexes that need initialization') # make sure that all the indexes that we're rebuilding are cleared - for _, index in indexes_to_init: + for index in indexes_to_init: index_db_name = index.get_db_name() if index_db_name: tx_storage.set_index_last_started_at(index_db_name, NULL_INDEX_LAST_STARTED_AT) @@ -184,27 +173,20 @@ def _manually_initialize(self, tx_storage: 'TransactionStorage') -> None: for index in self.iter_all_indexes(): index.init_start(self) - self.log.debug('indexes init') if indexes_to_init: - tx_iter = progress(tx_storage.topological_iterator(), log=self.log, total=tx_storage.get_vertices_count()) + overall_scope = reduce(operator.__or__, map(lambda i: i.get_scope(), indexes_to_init)) + tx_iter_inner = overall_scope.get_iterator(tx_storage) + tx_iter = progress(tx_iter_inner, log=self.log, total=tx_storage.get_vertices_count()) + self.log.debug('indexes init', scope=overall_scope) else: tx_iter = iter([]) - for tx in tx_iter: + self.log.debug('indexes init') - tx_meta = tx.get_metadata() + for tx in tx_iter: # feed each transaction to the indexes that they are interested in - for index_filter, index in indexes_to_init: - if index_filter is _IndexFilter.ALL: + for index in indexes_to_init: + if index.get_scope().matches(tx): index.init_loop_step(tx) - elif index_filter is _IndexFilter.ALL_BLOCKS: - if tx.is_block: - index.init_loop_step(tx) - elif index_filter is _IndexFilter.VALID_TXS: - # XXX: all indexes that use this filter treat soft-voided as voided, nothing special needed - if tx.is_transaction and not tx_meta.voided_by: - index.init_loop_step(tx) - else: - assert False, 'impossible filter' # Restore cache capacity. if isinstance(tx_storage, TransactionCacheStorage): @@ -307,13 +289,13 @@ def __init__(self) -> None: from hathor.indexes.memory_tips_index import MemoryTipsIndex self.info = MemoryInfoIndex() - self.all_tips = MemoryTipsIndex() - self.block_tips = MemoryTipsIndex() - self.tx_tips = MemoryTipsIndex() + self.all_tips = MemoryTipsIndex(all=True) + self.block_tips = MemoryTipsIndex(blocks=True) + self.tx_tips = MemoryTipsIndex(txs=True) - self.sorted_all = MemoryTimestampIndex() - self.sorted_blocks = MemoryTimestampIndex() - self.sorted_txs = MemoryTimestampIndex() + self.sorted_all = MemoryTimestampIndex(all=True) + self.sorted_blocks = MemoryTimestampIndex(blocks=True) + self.sorted_txs = MemoryTimestampIndex(txs=True) self.addresses = None self.tokens = None @@ -362,13 +344,13 @@ def __init__(self, db: 'rocksdb.DB') -> None: self.info = RocksDBInfoIndex(self._db) self.height = RocksDBHeightIndex(self._db) - self.all_tips = PartialRocksDBTipsIndex(self._db, 'all') - self.block_tips = PartialRocksDBTipsIndex(self._db, 'blocks') - self.tx_tips = PartialRocksDBTipsIndex(self._db, 'txs') + self.all_tips = PartialRocksDBTipsIndex(self._db, all=True) + self.block_tips = PartialRocksDBTipsIndex(self._db, blocks=True) + self.tx_tips = PartialRocksDBTipsIndex(self._db, txs=True) - self.sorted_all = RocksDBTimestampIndex(self._db, 'all') - self.sorted_blocks = RocksDBTimestampIndex(self._db, 'blocks') - self.sorted_txs = RocksDBTimestampIndex(self._db, 'txs') + self.sorted_all = RocksDBTimestampIndex(self._db, all=True) + self.sorted_blocks = RocksDBTimestampIndex(self._db, blocks=True) + self.sorted_txs = RocksDBTimestampIndex(self._db, txs=True) self.addresses = None self.tokens = None diff --git a/hathor/indexes/memory_timestamp_index.py b/hathor/indexes/memory_timestamp_index.py index d61e32677..d749a4924 100644 --- a/hathor/indexes/memory_timestamp_index.py +++ b/hathor/indexes/memory_timestamp_index.py @@ -35,7 +35,8 @@ class MemoryTimestampIndex(TimestampIndex): _index: 'SortedKeyList[TransactionIndexElement]' - def __init__(self) -> None: + def __init__(self, *, txs: bool = False, blocks: bool = False, all: bool = False): + super().__init__(txs=txs, blocks=blocks, all=all) self.log = logger.new() self.force_clear() diff --git a/hathor/indexes/memory_tips_index.py b/hathor/indexes/memory_tips_index.py index 46ff14f61..12134ba21 100644 --- a/hathor/indexes/memory_tips_index.py +++ b/hathor/indexes/memory_tips_index.py @@ -47,7 +47,8 @@ class MemoryTipsIndex(TipsIndex): # It is useful because the interval tree allows access only by the interval. tx_last_interval: Dict[bytes, Interval] - def __init__(self) -> None: + def __init__(self, *, txs: bool = False, blocks: bool = False, all: bool = False): + super().__init__(txs=txs, blocks=blocks, all=all) self.log = logger.new() self.tree = IntervalTree() self.tx_last_interval = {} diff --git a/hathor/indexes/mempool_tips_index.py b/hathor/indexes/mempool_tips_index.py index 9c322fcd8..fde3557d4 100644 --- a/hathor/indexes/mempool_tips_index.py +++ b/hathor/indexes/mempool_tips_index.py @@ -18,17 +18,26 @@ import structlog -from hathor.indexes.base_index import BaseIndex +from hathor.indexes.base_index import BaseIndex, Scope from hathor.transaction import BaseTransaction, Transaction from hathor.util import not_none if TYPE_CHECKING: # pragma: no cover from hathor.transaction.storage import TransactionStorage +SCOPE = Scope( + include_blocks=True, + include_txs=True, + include_voided=True, +) + class MempoolTipsIndex(BaseIndex): """Index to access the tips of the mempool transactions, which haven't been confirmed by a block.""" + def get_scope(self) -> Scope: + return SCOPE + def init_loop_step(self, tx: BaseTransaction) -> None: self.update(tx) diff --git a/hathor/indexes/partial_rocksdb_tips_index.py b/hathor/indexes/partial_rocksdb_tips_index.py index 8c59f3cb3..3bb8971fb 100644 --- a/hathor/indexes/partial_rocksdb_tips_index.py +++ b/hathor/indexes/partial_rocksdb_tips_index.py @@ -111,8 +111,15 @@ class PartialRocksDBTipsIndex(MemoryTipsIndex, RocksDBIndexUtils): # It is useful because the interval tree allows access only by the interval. tx_last_interval: Dict[bytes, Interval] - def __init__(self, db: 'rocksdb.DB', name: str) -> None: - MemoryTipsIndex.__init__(self) + def __init__(self, db: 'rocksdb.DB', *, txs: bool = False, blocks: bool = False, all: bool = False): + MemoryTipsIndex.__init__(self, txs=txs, blocks=blocks, all=all) + name: str + if txs: + name = 'txs' + elif blocks: + name = 'blocks' + elif all: + name = 'all' self.log = logger.new() # XXX: override MemoryTipsIndex logger so it shows the correct module RocksDBIndexUtils.__init__(self, db, f'tips-{name}'.encode()) self._name = name diff --git a/hathor/indexes/rocksdb_timestamp_index.py b/hathor/indexes/rocksdb_timestamp_index.py index a530b2453..d72ada0f4 100644 --- a/hathor/indexes/rocksdb_timestamp_index.py +++ b/hathor/indexes/rocksdb_timestamp_index.py @@ -38,7 +38,15 @@ class RocksDBTimestampIndex(TimestampIndex, RocksDBIndexUtils): It works nicely because rocksdb uses a tree sorted by key under the hood. """ - def __init__(self, db: 'rocksdb.DB', name: str) -> None: + def __init__(self, db: 'rocksdb.DB', *, txs: bool = False, blocks: bool = False, all: bool = False): + TimestampIndex.__init__(self, txs=txs, blocks=blocks, all=all) + name: str + if txs: + name = 'txs' + elif blocks: + name = 'blocks' + elif all: + name = 'all' self.log = logger.new() RocksDBIndexUtils.__init__(self, db, f'timestamp-sorted-{name}'.encode()) self._name = name diff --git a/hathor/indexes/timestamp_index.py b/hathor/indexes/timestamp_index.py index e2dea623e..bea994083 100644 --- a/hathor/indexes/timestamp_index.py +++ b/hathor/indexes/timestamp_index.py @@ -17,11 +17,29 @@ from structlog import get_logger -from hathor.indexes.base_index import BaseIndex +from hathor.indexes.base_index import BaseIndex, Scope from hathor.transaction import BaseTransaction logger = get_logger() +SCOPE_ALL = Scope( + include_blocks=True, + include_txs=True, + include_voided=True, +) + +SCOPE_TXS = Scope( + include_blocks=False, + include_txs=True, + include_voided=False, +) + +SCOPE_BLOCKS = Scope( + include_blocks=True, + include_txs=False, + include_voided=True, +) + class RangeIdx(NamedTuple): timestamp: int @@ -32,6 +50,20 @@ class TimestampIndex(BaseIndex): """ Index of transactions sorted by their timestamps. """ + def __init__(self, *, txs: bool = False, blocks: bool = False, all: bool = False): + if sum([txs, blocks, all]) != 1: + raise TypeError('Exactly one of these: txs, blocks, all, must be set to True') + self._scope: Scope + if txs: + self._scope = SCOPE_TXS + elif blocks: + self._scope = SCOPE_BLOCKS + elif all: + self._scope = SCOPE_ALL + + def get_scope(self) -> Scope: + return self._scope + def init_loop_step(self, tx: BaseTransaction) -> None: self.add_tx(tx) diff --git a/hathor/indexes/tips_index.py b/hathor/indexes/tips_index.py index 1b85cc523..b1cedf593 100644 --- a/hathor/indexes/tips_index.py +++ b/hathor/indexes/tips_index.py @@ -18,11 +18,29 @@ from intervaltree import Interval from structlog import get_logger -from hathor.indexes.base_index import BaseIndex +from hathor.indexes.base_index import BaseIndex, Scope from hathor.transaction import BaseTransaction logger = get_logger() +SCOPE_ALL = Scope( + include_blocks=True, + include_txs=True, + include_voided=True, +) + +SCOPE_TXS = Scope( + include_blocks=False, + include_txs=True, + include_voided=False, +) + +SCOPE_BLOCKS = Scope( + include_blocks=True, + include_txs=False, + include_voided=True, +) + class TipsIndex(BaseIndex): """ Use an interval tree to quick get the tips at a given timestamp. @@ -38,6 +56,20 @@ class TipsIndex(BaseIndex): TODO Use an interval tree stored in disk, possibly using a B-tree. """ + def __init__(self, *, txs: bool = False, blocks: bool = False, all: bool = False): + if sum([txs, blocks, all]) != 1: + raise TypeError('Exactly one of these: txs, blocks, all, must be set to True') + self._scope: Scope + if txs: + self._scope = SCOPE_TXS + elif blocks: + self._scope = SCOPE_BLOCKS + elif all: + self._scope = SCOPE_ALL + + def get_scope(self) -> Scope: + return self._scope + @abstractmethod def add_tx(self, tx: BaseTransaction) -> bool: """ Add a new transaction to the index diff --git a/hathor/indexes/tokens_index.py b/hathor/indexes/tokens_index.py index 27c38fa1d..5a5228927 100644 --- a/hathor/indexes/tokens_index.py +++ b/hathor/indexes/tokens_index.py @@ -15,9 +15,15 @@ from abc import ABC, abstractmethod from typing import Iterator, List, NamedTuple, Optional, Tuple -from hathor.indexes.base_index import BaseIndex +from hathor.indexes.base_index import BaseIndex, Scope from hathor.transaction import BaseTransaction +SCOPE = Scope( + include_blocks=False, + include_txs=True, + include_voided=True, +) + class TokenUtxoInfo(NamedTuple): tx_hash: bytes @@ -62,6 +68,9 @@ class TokensIndex(BaseIndex): """ Index of tokens by token uid """ + def get_scope(self) -> Scope: + return SCOPE + def init_loop_step(self, tx: BaseTransaction) -> None: tx_meta = tx.get_metadata() if tx_meta.voided_by: diff --git a/hathor/indexes/utxo_index.py b/hathor/indexes/utxo_index.py index 3c788bca9..b559cffe0 100644 --- a/hathor/indexes/utxo_index.py +++ b/hathor/indexes/utxo_index.py @@ -19,7 +19,7 @@ from structlog import get_logger from hathor.conf import HathorSettings -from hathor.indexes.base_index import BaseIndex +from hathor.indexes.base_index import BaseIndex, Scope from hathor.transaction import BaseTransaction, TxOutput from hathor.transaction.scripts import parse_address_script from hathor.util import sorted_merger @@ -27,6 +27,12 @@ logger = get_logger() settings = HathorSettings() +SCOPE = Scope( + include_blocks=True, + include_txs=True, + include_voided=True, +) + @dataclass(frozen=True) class UtxoIndexItem: @@ -104,6 +110,9 @@ def __init__(self): # interface methods provided by the base class + def get_scope(self) -> Scope: + return SCOPE + def init_loop_step(self, tx: BaseTransaction) -> None: self.update(tx) diff --git a/hathor/transaction/storage/memory_storage.py b/hathor/transaction/storage/memory_storage.py index c363ce188..8d57eb1ae 100644 --- a/hathor/transaction/storage/memory_storage.py +++ b/hathor/transaction/storage/memory_storage.py @@ -89,7 +89,7 @@ def _get_transaction(self, hash_bytes: bytes) -> BaseTransaction: else: raise TransactionDoesNotExist(hash_bytes.hex()) - def get_all_transactions(self) -> Iterator[BaseTransaction]: + def get_all_transactions(self, *, include_partial: bool = False) -> Iterator[BaseTransaction]: for tx in self.transactions.values(): tx = self._clone(tx) if tx.hash in self.metadata: diff --git a/hathor/transaction/storage/rocksdb_storage.py b/hathor/transaction/storage/rocksdb_storage.py index c6e48feae..7ff540359 100644 --- a/hathor/transaction/storage/rocksdb_storage.py +++ b/hathor/transaction/storage/rocksdb_storage.py @@ -16,6 +16,7 @@ from structlog import get_logger +from hathor.conf import HathorSettings from hathor.indexes import IndexesManager, MemoryIndexesManager, RocksDBIndexesManager from hathor.storage import RocksDBStorage from hathor.transaction.storage.exceptions import TransactionDoesNotExist @@ -29,6 +30,7 @@ from hathor.transaction import BaseTransaction, TransactionMetadata logger = get_logger() +settings = HathorSettings() _DB_NAME = 'data_v2.db' _CF_NAME_TX = b'tx' @@ -146,7 +148,7 @@ def _get_tx(self, hash_bytes: bytes, tx_data: bytes) -> 'BaseTransaction': self._save_to_weakref(tx) return tx - def get_all_transactions(self) -> Iterator['BaseTransaction']: + def get_all_transactions(self, *, include_partial: bool = False) -> Iterator['BaseTransaction']: tx: Optional['BaseTransaction'] items = self._db.iteritems(self._cf_tx) @@ -163,6 +165,10 @@ def get_all_transactions(self) -> Iterator['BaseTransaction']: tx = self._get_tx(hash_bytes, tx_data) assert tx is not None + if not include_partial: + assert tx._metadata is not None + if not tx._metadata.validation.is_fully_connected(): + continue yield tx def is_empty(self) -> bool: diff --git a/hathor/transaction/storage/transaction_storage.py b/hathor/transaction/storage/transaction_storage.py index c8b920a00..830353bf0 100644 --- a/hathor/transaction/storage/transaction_storage.py +++ b/hathor/transaction/storage/transaction_storage.py @@ -480,7 +480,7 @@ def get_metadata(self, hash_bytes: bytes) -> Optional[TransactionMetadata]: return None @abstractmethod - def get_all_transactions(self) -> Iterator[BaseTransaction]: + def get_all_transactions(self, *, include_partial: bool = False) -> Iterator[BaseTransaction]: # TODO: verify the following claim: """Return all transactions that are not blocks. """ @@ -1112,7 +1112,7 @@ def _topological_sort_timestamp_index(self) -> Iterator[BaseTransaction]: yield from cur_blocks yield from cur_txs - def _topological_sort_metadata(self) -> Iterator[BaseTransaction]: + def _topological_sort_metadata(self, *, include_partial: bool = False) -> Iterator[BaseTransaction]: import heapq from dataclasses import dataclass, field diff --git a/tests/others/test_init_manager.py b/tests/others/test_init_manager.py index 027ab3af2..375f74c32 100644 --- a/tests/others/test_init_manager.py +++ b/tests/others/test_init_manager.py @@ -25,12 +25,12 @@ def __init__(self, *args, **kwargs): def set_first_tx(self, tx: BaseTransaction) -> None: self._first_tx = tx - def get_all_transactions(self) -> Iterator[BaseTransaction]: + def get_all_transactions(self, *, include_partial: bool = False) -> Iterator[BaseTransaction]: skip_hash = None if self._first_tx: yield self._first_tx skip_hash = self._first_tx.hash - for tx in super().get_all_transactions(): + for tx in super().get_all_transactions(include_partial=include_partial): if tx.hash != skip_hash: yield tx diff --git a/tests/tx/test_indexes2.py b/tests/tx/test_indexes2.py index d53212b0e..6a7ac47fe 100644 --- a/tests/tx/test_indexes2.py +++ b/tests/tx/test_indexes2.py @@ -45,8 +45,8 @@ def test_timestamp_index(self): from hathor.indexes.memory_timestamp_index import MemoryTimestampIndex from hathor.indexes.rocksdb_timestamp_index import RocksDBTimestampIndex from hathor.indexes.timestamp_index import RangeIdx - rocksdb_index = RocksDBTimestampIndex(self.create_tmp_rocksdb_db(), 'foo') - memory_index = MemoryTimestampIndex() + rocksdb_index = RocksDBTimestampIndex(self.create_tmp_rocksdb_db(), all=True) + memory_index = MemoryTimestampIndex(all=True) for tx in self.transactions: rocksdb_index.add_tx(tx) memory_index.add_tx(tx) diff --git a/tests/tx/test_stratum.py b/tests/tx/test_stratum.py index dd4e531f7..0769c10af 100644 --- a/tests/tx/test_stratum.py +++ b/tests/tx/test_stratum.py @@ -38,7 +38,8 @@ def setUp(self): super().setUp() self.manager = self.create_peer('testnet') self.manager.allow_mining_without_peers() - self.factory = StratumFactory(self.manager, port=8123, reactor=MemoryReactorHeapClock()) + port = self.rng.randint(8000, 9000) + self.factory = StratumFactory(self.manager, port=port, reactor=MemoryReactorHeapClock()) self.factory.start() self.protocol = self.factory.buildProtocol('127.0.0.1') self.transport = StringTransportWithDisconnection() From 24038b97481220e35c18ee93dfa3221a80db53e6 Mon Sep 17 00:00:00 2001 From: Jan Segre Date: Wed, 8 Mar 2023 17:18:03 -0300 Subject: [PATCH 3/3] review changes 1 --- hathor/indexes/address_index.py | 2 +- hathor/indexes/base_index.py | 73 +-------------- hathor/indexes/deps_index.py | 3 +- hathor/indexes/height_index.py | 3 +- hathor/indexes/info_index.py | 3 +- hathor/indexes/manager.py | 61 ++++++------- hathor/indexes/memory_timestamp_index.py | 6 +- hathor/indexes/memory_tips_index.py | 6 +- hathor/indexes/mempool_tips_index.py | 3 +- hathor/indexes/partial_rocksdb_tips_index.py | 16 +--- hathor/indexes/rocksdb_timestamp_index.py | 17 +--- hathor/indexes/scope.py | 91 +++++++++++++++++++ hathor/indexes/timestamp_index.py | 51 +++++------ hathor/indexes/tips_index.py | 51 +++++------ hathor/indexes/tokens_index.py | 3 +- hathor/indexes/utxo_index.py | 3 +- hathor/transaction/storage/rocksdb_storage.py | 2 - tests/tx/test_indexes2.py | 6 +- 18 files changed, 202 insertions(+), 198 deletions(-) create mode 100644 hathor/indexes/scope.py diff --git a/hathor/indexes/address_index.py b/hathor/indexes/address_index.py index fca7ce4ca..73c8da1ef 100644 --- a/hathor/indexes/address_index.py +++ b/hathor/indexes/address_index.py @@ -17,7 +17,7 @@ from structlog import get_logger -from hathor.indexes.base_index import Scope +from hathor.indexes.scope import Scope from hathor.indexes.tx_group_index import TxGroupIndex from hathor.pubsub import HathorEvents from hathor.transaction import BaseTransaction diff --git a/hathor/indexes/base_index.py b/hathor/indexes/base_index.py index 1edd60dc1..b68fc8b17 100644 --- a/hathor/indexes/base_index.py +++ b/hathor/indexes/base_index.py @@ -13,82 +13,13 @@ # limitations under the License. from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Iterator, NamedTuple, Optional +from typing import TYPE_CHECKING, Optional +from hathor.indexes.scope import Scope from hathor.transaction.base_transaction import BaseTransaction if TYPE_CHECKING: # pragma: no cover from hathor.indexes.manager import IndexesManager - from hathor.transaction.storage import TransactionStorage - - -class Scope(NamedTuple): - """ This class models the scope of transactions that an index is interested in. - - It is used for both selecting the optimal iterator for all the indexes that need to be initialized and for - filtering which transactions are fed to the index. - """ - include_blocks: bool - include_txs: bool - include_voided: bool - # XXX: these have a default value since it should be really rare to have it different - include_partial: bool = False - topological_order: bool = True # if False than ordering doesn't matter - - # XXX: this is used to join the scope of multiple indexes to get an overall scope that includes everything that - # each individual scope needs, the OR operator was chosen because it represents well the operation of keeping - # a property if either A or B needs it - def __or__(self, other): - # XXX: note that this doesn't necessarily have to be OR operations between properties, we want the operations - # that broaden the scope, and not narrow it. - # XXX: in the case of topological_order, we want to keep the "topological" ordering if any of them requires it, - # so it also is an OR operator - return Scope( - include_blocks=self.include_blocks | other.include_blocks, - include_txs=self.include_txs | other.include_txs, - include_voided=self.include_voided | other.include_voided, - include_partial=self.include_partial | other.include_partial, - topological_order=self.topological_order | other.topological_order, - ) - - def matches(self, tx: BaseTransaction) -> bool: - """ Check if a transaction matches this scope, True means the index is interested in this transaction. - """ - if tx.is_block and not self.include_blocks: - return False - if tx.is_transaction and not self.include_txs: - return False - tx_meta = tx.get_metadata() - if tx_meta.voided_by and not self.include_voided: - return False - if not tx_meta.validation.is_fully_connected() and not self.include_partial: - return False - # XXX: self.topologial_order doesn't affect self.match() - # passed all checks - return True - - def get_iterator(self, tx_storage: 'TransactionStorage') -> Iterator[BaseTransaction]: - iterator: Iterator[BaseTransaction] - # XXX: this is to mark if the chosen iterator will yield partial transactions - iterator_covers_partial: bool - if self.topological_order: - iterator = tx_storage.topological_iterator() - iterator_covers_partial = False - else: - iterator = tx_storage.get_all_transactions() - iterator_covers_partial = True - for tx in iterator: - if self.matches(tx): - yield tx - if self.include_partial and not iterator_covers_partial: - # if partial transactions are needed and were not already covered, we use get_all_transactions, which - # includes partial transactions, to yield them, skipping all that aren't partial - for tx in tx_storage.get_all_transactions(): - tx_meta = tx.get_metadata() - if tx_meta.validation.is_fully_connected(): - continue - if self.matches(tx): - yield tx class BaseIndex(ABC): diff --git a/hathor/indexes/deps_index.py b/hathor/indexes/deps_index.py index f4529c7e3..81362deac 100644 --- a/hathor/indexes/deps_index.py +++ b/hathor/indexes/deps_index.py @@ -15,7 +15,8 @@ from abc import abstractmethod from typing import TYPE_CHECKING, Iterator, List -from hathor.indexes.base_index import BaseIndex, Scope +from hathor.indexes.base_index import BaseIndex +from hathor.indexes.scope import Scope from hathor.transaction import BaseTransaction, Block if TYPE_CHECKING: # pragma: no cover diff --git a/hathor/indexes/height_index.py b/hathor/indexes/height_index.py index fd591951e..2a62cfc2c 100644 --- a/hathor/indexes/height_index.py +++ b/hathor/indexes/height_index.py @@ -15,7 +15,8 @@ from abc import abstractmethod from typing import List, NamedTuple, Optional, Tuple -from hathor.indexes.base_index import BaseIndex, Scope +from hathor.indexes.base_index import BaseIndex +from hathor.indexes.scope import Scope from hathor.transaction import BaseTransaction, Block from hathor.transaction.genesis import BLOCK_GENESIS from hathor.util import not_none diff --git a/hathor/indexes/info_index.py b/hathor/indexes/info_index.py index 6da5345e9..e69edbb0f 100644 --- a/hathor/indexes/info_index.py +++ b/hathor/indexes/info_index.py @@ -16,7 +16,8 @@ from structlog import get_logger -from hathor.indexes.base_index import BaseIndex, Scope +from hathor.indexes.base_index import BaseIndex +from hathor.indexes.scope import Scope from hathor.transaction import BaseTransaction logger = get_logger() diff --git a/hathor/indexes/manager.py b/hathor/indexes/manager.py index 50fe42753..d6899f6a2 100644 --- a/hathor/indexes/manager.py +++ b/hathor/indexes/manager.py @@ -25,8 +25,8 @@ from hathor.indexes.height_index import HeightIndex from hathor.indexes.info_index import InfoIndex from hathor.indexes.mempool_tips_index import MempoolTipsIndex -from hathor.indexes.timestamp_index import TimestampIndex -from hathor.indexes.tips_index import TipsIndex +from hathor.indexes.timestamp_index import ScopeType as TimestampScopeType, TimestampIndex +from hathor.indexes.tips_index import ScopeType as TipsScopeType, TipsIndex from hathor.indexes.tokens_index import TokensIndex from hathor.indexes.utxo_index import UtxoIndex from hathor.transaction import BaseTransaction @@ -82,24 +82,21 @@ def __init_checks__(self): def iter_all_indexes(self) -> Iterator[BaseIndex]: """ Iterate over all of the indexes abstracted by this manager, hiding their specific implementation details""" - yield self.info - yield self.all_tips - yield self.block_tips - yield self.tx_tips - yield self.sorted_all - yield self.sorted_blocks - yield self.sorted_txs - yield self.height - if self.deps is not None: - yield self.deps - if self.mempool_tips is not None: - yield self.mempool_tips - if self.addresses is not None: - yield self.addresses - if self.tokens is not None: - yield self.tokens - if self.utxo is not None: - yield self.utxo + return filter(None, [ + self.info, + self.all_tips, + self.block_tips, + self.tx_tips, + self.sorted_all, + self.sorted_blocks, + self.sorted_txs, + self.height, + self.deps, + self.mempool_tips, + self.addresses, + self.tokens, + self.utxo, + ]) @abstractmethod def enable_address_index(self, pubsub: 'PubSubManager') -> None: @@ -289,13 +286,13 @@ def __init__(self) -> None: from hathor.indexes.memory_tips_index import MemoryTipsIndex self.info = MemoryInfoIndex() - self.all_tips = MemoryTipsIndex(all=True) - self.block_tips = MemoryTipsIndex(blocks=True) - self.tx_tips = MemoryTipsIndex(txs=True) + self.all_tips = MemoryTipsIndex(scope_type=TipsScopeType.ALL) + self.block_tips = MemoryTipsIndex(scope_type=TipsScopeType.BLOCKS) + self.tx_tips = MemoryTipsIndex(scope_type=TipsScopeType.TXS) - self.sorted_all = MemoryTimestampIndex(all=True) - self.sorted_blocks = MemoryTimestampIndex(blocks=True) - self.sorted_txs = MemoryTimestampIndex(txs=True) + self.sorted_all = MemoryTimestampIndex(scope_type=TimestampScopeType.ALL) + self.sorted_blocks = MemoryTimestampIndex(scope_type=TimestampScopeType.BLOCKS) + self.sorted_txs = MemoryTimestampIndex(scope_type=TimestampScopeType.TXS) self.addresses = None self.tokens = None @@ -344,13 +341,13 @@ def __init__(self, db: 'rocksdb.DB') -> None: self.info = RocksDBInfoIndex(self._db) self.height = RocksDBHeightIndex(self._db) - self.all_tips = PartialRocksDBTipsIndex(self._db, all=True) - self.block_tips = PartialRocksDBTipsIndex(self._db, blocks=True) - self.tx_tips = PartialRocksDBTipsIndex(self._db, txs=True) + self.all_tips = PartialRocksDBTipsIndex(self._db, scope_type=TipsScopeType.ALL) + self.block_tips = PartialRocksDBTipsIndex(self._db, scope_type=TipsScopeType.BLOCKS) + self.tx_tips = PartialRocksDBTipsIndex(self._db, scope_type=TipsScopeType.TXS) - self.sorted_all = RocksDBTimestampIndex(self._db, all=True) - self.sorted_blocks = RocksDBTimestampIndex(self._db, blocks=True) - self.sorted_txs = RocksDBTimestampIndex(self._db, txs=True) + self.sorted_all = RocksDBTimestampIndex(self._db, scope_type=TimestampScopeType.ALL) + self.sorted_blocks = RocksDBTimestampIndex(self._db, scope_type=TimestampScopeType.BLOCKS) + self.sorted_txs = RocksDBTimestampIndex(self._db, scope_type=TimestampScopeType.TXS) self.addresses = None self.tokens = None diff --git a/hathor/indexes/memory_timestamp_index.py b/hathor/indexes/memory_timestamp_index.py index d749a4924..523e1bb3e 100644 --- a/hathor/indexes/memory_timestamp_index.py +++ b/hathor/indexes/memory_timestamp_index.py @@ -17,7 +17,7 @@ from sortedcontainers import SortedKeyList from structlog import get_logger -from hathor.indexes.timestamp_index import RangeIdx, TimestampIndex +from hathor.indexes.timestamp_index import RangeIdx, ScopeType, TimestampIndex from hathor.indexes.utils import ( TransactionIndexElement, get_newer_sorted_key_list, @@ -35,8 +35,8 @@ class MemoryTimestampIndex(TimestampIndex): _index: 'SortedKeyList[TransactionIndexElement]' - def __init__(self, *, txs: bool = False, blocks: bool = False, all: bool = False): - super().__init__(txs=txs, blocks=blocks, all=all) + def __init__(self, *, scope_type: ScopeType): + super().__init__(scope_type=scope_type) self.log = logger.new() self.force_clear() diff --git a/hathor/indexes/memory_tips_index.py b/hathor/indexes/memory_tips_index.py index 12134ba21..b8b8c6310 100644 --- a/hathor/indexes/memory_tips_index.py +++ b/hathor/indexes/memory_tips_index.py @@ -18,7 +18,7 @@ from intervaltree import Interval, IntervalTree from structlog import get_logger -from hathor.indexes.tips_index import TipsIndex +from hathor.indexes.tips_index import ScopeType, TipsIndex from hathor.transaction import BaseTransaction logger = get_logger() @@ -47,8 +47,8 @@ class MemoryTipsIndex(TipsIndex): # It is useful because the interval tree allows access only by the interval. tx_last_interval: Dict[bytes, Interval] - def __init__(self, *, txs: bool = False, blocks: bool = False, all: bool = False): - super().__init__(txs=txs, blocks=blocks, all=all) + def __init__(self, *, scope_type: ScopeType): + super().__init__(scope_type=scope_type) self.log = logger.new() self.tree = IntervalTree() self.tx_last_interval = {} diff --git a/hathor/indexes/mempool_tips_index.py b/hathor/indexes/mempool_tips_index.py index fde3557d4..784327f69 100644 --- a/hathor/indexes/mempool_tips_index.py +++ b/hathor/indexes/mempool_tips_index.py @@ -18,7 +18,8 @@ import structlog -from hathor.indexes.base_index import BaseIndex, Scope +from hathor.indexes.base_index import BaseIndex +from hathor.indexes.scope import Scope from hathor.transaction import BaseTransaction, Transaction from hathor.util import not_none diff --git a/hathor/indexes/partial_rocksdb_tips_index.py b/hathor/indexes/partial_rocksdb_tips_index.py index 3bb8971fb..b41252d11 100644 --- a/hathor/indexes/partial_rocksdb_tips_index.py +++ b/hathor/indexes/partial_rocksdb_tips_index.py @@ -22,6 +22,7 @@ from hathor.indexes.memory_tips_index import MemoryTipsIndex from hathor.indexes.rocksdb_utils import RocksDBIndexUtils +from hathor.indexes.tips_index import ScopeType from hathor.util import LogDuration if TYPE_CHECKING: # pragma: no cover @@ -111,18 +112,11 @@ class PartialRocksDBTipsIndex(MemoryTipsIndex, RocksDBIndexUtils): # It is useful because the interval tree allows access only by the interval. tx_last_interval: Dict[bytes, Interval] - def __init__(self, db: 'rocksdb.DB', *, txs: bool = False, blocks: bool = False, all: bool = False): - MemoryTipsIndex.__init__(self, txs=txs, blocks=blocks, all=all) - name: str - if txs: - name = 'txs' - elif blocks: - name = 'blocks' - elif all: - name = 'all' + def __init__(self, db: 'rocksdb.DB', *, scope_type: ScopeType): + MemoryTipsIndex.__init__(self, scope_type=scope_type) + self._name = scope_type.get_name() self.log = logger.new() # XXX: override MemoryTipsIndex logger so it shows the correct module - RocksDBIndexUtils.__init__(self, db, f'tips-{name}'.encode()) - self._name = name + RocksDBIndexUtils.__init__(self, db, f'tips-{self._name}'.encode()) def get_db_name(self) -> Optional[str]: return f'tips_{self._name}' diff --git a/hathor/indexes/rocksdb_timestamp_index.py b/hathor/indexes/rocksdb_timestamp_index.py index d72ada0f4..01e4e609b 100644 --- a/hathor/indexes/rocksdb_timestamp_index.py +++ b/hathor/indexes/rocksdb_timestamp_index.py @@ -17,7 +17,7 @@ from structlog import get_logger from hathor.indexes.rocksdb_utils import RocksDBIndexUtils, incr_key -from hathor.indexes.timestamp_index import RangeIdx, TimestampIndex +from hathor.indexes.timestamp_index import RangeIdx, ScopeType, TimestampIndex from hathor.transaction import BaseTransaction from hathor.util import collect_n, skip_n @@ -38,18 +38,11 @@ class RocksDBTimestampIndex(TimestampIndex, RocksDBIndexUtils): It works nicely because rocksdb uses a tree sorted by key under the hood. """ - def __init__(self, db: 'rocksdb.DB', *, txs: bool = False, blocks: bool = False, all: bool = False): - TimestampIndex.__init__(self, txs=txs, blocks=blocks, all=all) - name: str - if txs: - name = 'txs' - elif blocks: - name = 'blocks' - elif all: - name = 'all' + def __init__(self, db: 'rocksdb.DB', *, scope_type: ScopeType): + TimestampIndex.__init__(self, scope_type=scope_type) + self._name = scope_type.get_name() self.log = logger.new() - RocksDBIndexUtils.__init__(self, db, f'timestamp-sorted-{name}'.encode()) - self._name = name + RocksDBIndexUtils.__init__(self, db, f'timestamp-sorted-{self._name}'.encode()) def get_db_name(self) -> Optional[str]: return f'timestamp_{self._name}' diff --git a/hathor/indexes/scope.py b/hathor/indexes/scope.py new file mode 100644 index 000000000..0a1e84c35 --- /dev/null +++ b/hathor/indexes/scope.py @@ -0,0 +1,91 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import TYPE_CHECKING, Iterator, NamedTuple + +from hathor.transaction.base_transaction import BaseTransaction + +if TYPE_CHECKING: # pragma: no cover + from hathor.transaction.storage import TransactionStorage + + +class Scope(NamedTuple): + """ This class models the scope of transactions that an index is interested in. + + It is used for both selecting the optimal iterator for all the indexes that need to be initialized and for + filtering which transactions are fed to the index. + """ + include_blocks: bool + include_txs: bool + include_voided: bool + # XXX: these have a default value since it should be really rare to have it different + include_partial: bool = False + topological_order: bool = True # if False than ordering doesn't matter + + # XXX: this is used to join the scope of multiple indexes to get an overall scope that includes everything that + # each individual scope needs, the OR operator was chosen because it represents well the operation of keeping + # a property if either A or B needs it + def __or__(self, other): + # XXX: note that this doesn't necessarily have to be OR operations between properties, we want the operations + # that broaden the scope, and not narrow it. + # XXX: in the case of topological_order, we want to keep the "topological" ordering if any of them requires it, + # so it also is an OR operator + return Scope( + include_blocks=self.include_blocks | other.include_blocks, + include_txs=self.include_txs | other.include_txs, + include_voided=self.include_voided | other.include_voided, + include_partial=self.include_partial | other.include_partial, + topological_order=self.topological_order | other.topological_order, + ) + + def matches(self, tx: BaseTransaction) -> bool: + """ Check if a transaction matches this scope, True means the index is interested in this transaction. + """ + if tx.is_block and not self.include_blocks: + return False + if tx.is_transaction and not self.include_txs: + return False + tx_meta = tx.get_metadata() + if tx_meta.voided_by and not self.include_voided: + return False + if not tx_meta.validation.is_fully_connected() and not self.include_partial: + return False + # XXX: self.topologial_order doesn't affect self.match() + # passed all checks + return True + + def get_iterator(self, tx_storage: 'TransactionStorage') -> Iterator[BaseTransaction]: + """ This method returns an iterator that only yields transaction that match the current scope. + """ + iterator: Iterator[BaseTransaction] + # XXX: this is to mark if the chosen iterator will yield partial transactions + iterator_covers_partial: bool + if self.topological_order: + iterator = tx_storage.topological_iterator() + iterator_covers_partial = False + else: + iterator = tx_storage.get_all_transactions() + iterator_covers_partial = True + for tx in iterator: + if self.matches(tx): + yield tx + if self.include_partial and not iterator_covers_partial: + # if partial transactions are needed and were not already covered, we use get_all_transactions, which + # includes partial transactions, to yield them, skipping all that aren't partial + for tx in tx_storage.get_all_transactions(): + tx_meta = tx.get_metadata() + if tx_meta.validation.is_fully_connected(): + continue + if self.matches(tx): + yield tx diff --git a/hathor/indexes/timestamp_index.py b/hathor/indexes/timestamp_index.py index bea994083..a738dfc47 100644 --- a/hathor/indexes/timestamp_index.py +++ b/hathor/indexes/timestamp_index.py @@ -13,32 +13,37 @@ # limitations under the License. from abc import abstractmethod +from enum import Enum from typing import Iterator, List, NamedTuple, Optional, Tuple from structlog import get_logger -from hathor.indexes.base_index import BaseIndex, Scope +from hathor.indexes.base_index import BaseIndex +from hathor.indexes.scope import Scope from hathor.transaction import BaseTransaction logger = get_logger() -SCOPE_ALL = Scope( - include_blocks=True, - include_txs=True, - include_voided=True, -) -SCOPE_TXS = Scope( - include_blocks=False, - include_txs=True, - include_voided=False, -) +class ScopeType(Enum): + ALL = Scope( + include_blocks=True, + include_txs=True, + include_voided=True, + ) + TXS = Scope( + include_blocks=False, + include_txs=True, + include_voided=False, + ) + BLOCKS = Scope( + include_blocks=True, + include_txs=False, + include_voided=True, + ) -SCOPE_BLOCKS = Scope( - include_blocks=True, - include_txs=False, - include_voided=True, -) + def get_name(self) -> str: + return self.name.lower() class RangeIdx(NamedTuple): @@ -50,19 +55,11 @@ class TimestampIndex(BaseIndex): """ Index of transactions sorted by their timestamps. """ - def __init__(self, *, txs: bool = False, blocks: bool = False, all: bool = False): - if sum([txs, blocks, all]) != 1: - raise TypeError('Exactly one of these: txs, blocks, all, must be set to True') - self._scope: Scope - if txs: - self._scope = SCOPE_TXS - elif blocks: - self._scope = SCOPE_BLOCKS - elif all: - self._scope = SCOPE_ALL + def __init__(self, *, scope_type: ScopeType): + self._scope_type = scope_type def get_scope(self) -> Scope: - return self._scope + return self._scope_type.value def init_loop_step(self, tx: BaseTransaction) -> None: self.add_tx(tx) diff --git a/hathor/indexes/tips_index.py b/hathor/indexes/tips_index.py index b1cedf593..f9fe09c67 100644 --- a/hathor/indexes/tips_index.py +++ b/hathor/indexes/tips_index.py @@ -13,33 +13,38 @@ # limitations under the License. from abc import abstractmethod +from enum import Enum from typing import Set from intervaltree import Interval from structlog import get_logger -from hathor.indexes.base_index import BaseIndex, Scope +from hathor.indexes.base_index import BaseIndex +from hathor.indexes.scope import Scope from hathor.transaction import BaseTransaction logger = get_logger() -SCOPE_ALL = Scope( - include_blocks=True, - include_txs=True, - include_voided=True, -) -SCOPE_TXS = Scope( - include_blocks=False, - include_txs=True, - include_voided=False, -) +class ScopeType(Enum): + ALL = Scope( + include_blocks=True, + include_txs=True, + include_voided=True, + ) + TXS = Scope( + include_blocks=False, + include_txs=True, + include_voided=False, + ) + BLOCKS = Scope( + include_blocks=True, + include_txs=False, + include_voided=True, + ) -SCOPE_BLOCKS = Scope( - include_blocks=True, - include_txs=False, - include_voided=True, -) + def get_name(self) -> str: + return self.name.lower() class TipsIndex(BaseIndex): @@ -56,19 +61,11 @@ class TipsIndex(BaseIndex): TODO Use an interval tree stored in disk, possibly using a B-tree. """ - def __init__(self, *, txs: bool = False, blocks: bool = False, all: bool = False): - if sum([txs, blocks, all]) != 1: - raise TypeError('Exactly one of these: txs, blocks, all, must be set to True') - self._scope: Scope - if txs: - self._scope = SCOPE_TXS - elif blocks: - self._scope = SCOPE_BLOCKS - elif all: - self._scope = SCOPE_ALL + def __init__(self, *, scope_type: ScopeType): + self._scope_type = scope_type def get_scope(self) -> Scope: - return self._scope + return self._scope_type.value @abstractmethod def add_tx(self, tx: BaseTransaction) -> bool: diff --git a/hathor/indexes/tokens_index.py b/hathor/indexes/tokens_index.py index 5a5228927..9528ee32b 100644 --- a/hathor/indexes/tokens_index.py +++ b/hathor/indexes/tokens_index.py @@ -15,7 +15,8 @@ from abc import ABC, abstractmethod from typing import Iterator, List, NamedTuple, Optional, Tuple -from hathor.indexes.base_index import BaseIndex, Scope +from hathor.indexes.base_index import BaseIndex +from hathor.indexes.scope import Scope from hathor.transaction import BaseTransaction SCOPE = Scope( diff --git a/hathor/indexes/utxo_index.py b/hathor/indexes/utxo_index.py index b559cffe0..8606b2ba5 100644 --- a/hathor/indexes/utxo_index.py +++ b/hathor/indexes/utxo_index.py @@ -19,7 +19,8 @@ from structlog import get_logger from hathor.conf import HathorSettings -from hathor.indexes.base_index import BaseIndex, Scope +from hathor.indexes.base_index import BaseIndex +from hathor.indexes.scope import Scope from hathor.transaction import BaseTransaction, TxOutput from hathor.transaction.scripts import parse_address_script from hathor.util import sorted_merger diff --git a/hathor/transaction/storage/rocksdb_storage.py b/hathor/transaction/storage/rocksdb_storage.py index 7ff540359..1a03df316 100644 --- a/hathor/transaction/storage/rocksdb_storage.py +++ b/hathor/transaction/storage/rocksdb_storage.py @@ -16,7 +16,6 @@ from structlog import get_logger -from hathor.conf import HathorSettings from hathor.indexes import IndexesManager, MemoryIndexesManager, RocksDBIndexesManager from hathor.storage import RocksDBStorage from hathor.transaction.storage.exceptions import TransactionDoesNotExist @@ -30,7 +29,6 @@ from hathor.transaction import BaseTransaction, TransactionMetadata logger = get_logger() -settings = HathorSettings() _DB_NAME = 'data_v2.db' _CF_NAME_TX = b'tx' diff --git a/tests/tx/test_indexes2.py b/tests/tx/test_indexes2.py index 6a7ac47fe..ea8aea4ea 100644 --- a/tests/tx/test_indexes2.py +++ b/tests/tx/test_indexes2.py @@ -44,9 +44,9 @@ def test_timestamp_index(self): # setup two indexes with different backends from hathor.indexes.memory_timestamp_index import MemoryTimestampIndex from hathor.indexes.rocksdb_timestamp_index import RocksDBTimestampIndex - from hathor.indexes.timestamp_index import RangeIdx - rocksdb_index = RocksDBTimestampIndex(self.create_tmp_rocksdb_db(), all=True) - memory_index = MemoryTimestampIndex(all=True) + from hathor.indexes.timestamp_index import RangeIdx, ScopeType + rocksdb_index = RocksDBTimestampIndex(self.create_tmp_rocksdb_db(), scope_type=ScopeType.ALL) + memory_index = MemoryTimestampIndex(scope_type=ScopeType.ALL) for tx in self.transactions: rocksdb_index.add_tx(tx) memory_index.add_tx(tx)