Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class BuildArtifacts(NamedTuple):
tx_storage: TransactionStorage
feature_service: FeatureService
bit_signaling_service: BitSignalingService
indexes: Optional[IndexesManager]
indexes: IndexesManager
wallet: Optional[BaseWallet]
rocksdb_storage: RocksDBStorage
stratum_factory: Optional[StratumFactory]
Expand Down
1 change: 0 additions & 1 deletion hathor/builder/resources_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,6 @@ def create_resources(self) -> server.Site:
parent.putChild(url_path, resource)

# Websocket resource
assert self.manager.tx_storage.indexes is not None
ws_factory = HathorAdminWebsocketFactory(manager=self.manager,
metrics=self.manager.metrics,
address_index=self.manager.tx_storage.indexes.addresses)
Expand Down
5 changes: 0 additions & 5 deletions hathor/consensus/block_consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,6 @@ def _nc_execute_calls(self, block: Block, *, is_reorg: bool) -> None:

# Update indexes. This must be after metadata is updated.
assert tx.storage is not None
assert tx.storage.indexes is not None
tx.storage.indexes.handle_contract_execution(tx)

# Pubsub event to indicate execution success
Expand Down Expand Up @@ -413,9 +412,7 @@ def update_voided_info(self, block: Block) -> None:
return

assert block.storage is not None

storage = block.storage
assert storage.indexes is not None

# Union of voided_by of parents
voided_by: set[bytes] = self.union_voided_by_from_parents(block)
Expand Down Expand Up @@ -517,7 +514,6 @@ def mark_as_reorg_if_needed(self, common_block: Block, new_best_block: Block) ->
"""Mark as reorg only if reorg size > 0."""
assert new_best_block.storage is not None
storage = new_best_block.storage
assert storage.indexes is not None
_, old_best_block_hash = storage.indexes.height.get_height_tip()
old_best_block = storage.get_transaction(old_best_block_hash)
assert isinstance(old_best_block, Block)
Expand Down Expand Up @@ -745,7 +741,6 @@ def remove_first_block_markers(self, block: Block) -> None:
if tx.is_nano_contract():
if meta.nc_execution == NCExecutionState.SUCCESS:
assert tx.storage is not None
assert tx.storage.indexes is not None
tx.storage.indexes.handle_contract_unexecution(tx)
meta.nc_execution = NCExecutionState.PENDING
meta.nc_calls = None
Expand Down
7 changes: 1 addition & 6 deletions hathor/consensus/consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ def unsafe_update(self, base: BaseTransaction) -> None:

assert base.storage is not None
storage = base.storage
assert storage.indexes is not None
best_height, best_tip = storage.indexes.height.get_height_tip()

# This has to be called before the removal of vertices, otherwise this call may fail.
Expand All @@ -139,8 +138,7 @@ def unsafe_update(self, base: BaseTransaction) -> None:
# signal a mempool tips index update for all affected transactions,
# because that index is used on _compute_vertices_that_became_invalid below.
for tx_affected in _sorted_affected_txs(context.txs_affected):
if storage.indexes.mempool_tips is not None:
storage.indexes.mempool_tips.update(tx_affected)
storage.indexes.mempool_tips.update(tx_affected)

txs_to_remove: list[BaseTransaction] = []
new_best_height, new_best_tip = storage.indexes.height.get_height_tip()
Expand Down Expand Up @@ -192,7 +190,6 @@ def unsafe_update(self, base: BaseTransaction) -> None:
# finally signal an index update for all affected transactions
for tx_affected in _sorted_affected_txs(context.txs_affected):
assert tx_affected.storage is not None
assert tx_affected.storage.indexes is not None
tx_affected.storage.indexes.update(tx_affected)
context.pubsub.publish(HathorEvents.CONSENSUS_TX_UPDATE, tx=tx_affected)

Expand Down Expand Up @@ -335,8 +332,6 @@ def _compute_vertices_that_became_invalid(
"""This method will look for transactions in the mempool that have become invalid after a reorg."""
from hathor.transaction.storage.traversal import BFSTimestampWalk
from hathor.transaction.validation_state import ValidationState
assert storage.indexes is not None
assert storage.indexes.mempool_tips is not None

mempool_tips = list(storage.indexes.mempool_tips.iter(storage))
if not mempool_tips:
Expand Down
28 changes: 6 additions & 22 deletions hathor/indexes/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class IndexesManager(ABC):
sorted_txs: TimestampIndex

height: HeightIndex
mempool_tips: Optional[MempoolTipsIndex]
mempool_tips: MempoolTipsIndex
addresses: Optional[AddressIndex]
tokens: Optional[TokensIndex]
utxo: Optional[UtxoIndex]
Expand Down Expand Up @@ -120,22 +120,11 @@ def enable_utxo_index(self) -> None:
"""Enable UTXO index. It does nothing if it has already been enabled."""
raise NotImplementedError

@abstractmethod
def enable_mempool_index(self) -> None:
"""Enable mempool index. It does nothing if it has already been enabled."""
raise NotImplementedError

@abstractmethod
def enable_nc_indexes(self) -> None:
"""Enable Nano Contract related indexes."""
raise NotImplementedError

def force_clear_all(self) -> None:
""" Force clear all indexes.
"""
for index in self.iter_all_indexes():
index.force_clear()

def _manually_initialize(self, tx_storage: 'TransactionStorage') -> None:
""" Initialize the indexes, checking the indexes that need initialization, and the optimal iterator to use.
"""
Expand Down Expand Up @@ -205,8 +194,7 @@ def _manually_initialize(self, tx_storage: 'TransactionStorage') -> None:
def update(self, tx: BaseTransaction) -> None:
""" This is the new update method that indexes should use instead of add_tx/del_tx
"""
if self.mempool_tips:
self.mempool_tips.update(tx)
self.mempool_tips.update(tx)
if self.utxo:
self.utxo.update(tx)

Expand Down Expand Up @@ -413,7 +401,7 @@ def del_tx(self, tx: BaseTransaction, *, remove_all: bool = False, relax_assert:
self.info.update_counts(tx, remove=True)

# mempool will pick-up if the transaction is voided/invalid and remove it
if self.mempool_tips is not None and tx.storage.transaction_exists(tx.hash):
if tx.storage.transaction_exists(tx.hash):
logger.debug('remove from mempool tips', tx=tx.hash_hex)
self.mempool_tips.update(tx, force_remove=True)

Expand All @@ -428,6 +416,7 @@ def del_tx(self, tx: BaseTransaction, *, remove_all: bool = False, relax_assert:

class RocksDBIndexesManager(IndexesManager):
def __init__(self, rocksdb_storage: 'RocksDBStorage', *, settings: HathorSettings) -> None:
from hathor.indexes.memory_mempool_tips_index import MemoryMempoolTipsIndex
from hathor.indexes.rocksdb_height_index import RocksDBHeightIndex
from hathor.indexes.rocksdb_info_index import RocksDBInfoIndex
from hathor.indexes.rocksdb_timestamp_index import RocksDBTimestampIndex
Expand All @@ -437,6 +426,8 @@ def __init__(self, rocksdb_storage: 'RocksDBStorage', *, settings: HathorSetting

self.info = RocksDBInfoIndex(self._db, settings=settings)
self.height = RocksDBHeightIndex(self._db, settings=settings)
# XXX: use of RocksDBMempoolTipsIndex is very slow and was suspended
self.mempool_tips = MemoryMempoolTipsIndex(settings=self.settings)

self.sorted_all = RocksDBTimestampIndex(self._db, scope_type=TimestampScopeType.ALL, settings=settings)
self.sorted_blocks = RocksDBTimestampIndex(self._db, scope_type=TimestampScopeType.BLOCKS, settings=settings)
Expand All @@ -445,7 +436,6 @@ def __init__(self, rocksdb_storage: 'RocksDBStorage', *, settings: HathorSetting
self.addresses = None
self.tokens = None
self.utxo = None
self.mempool_tips = None
self.nc_creation = None
self.nc_history = None
self.blueprints = None
Expand All @@ -469,12 +459,6 @@ def enable_utxo_index(self) -> None:
if self.utxo is None:
self.utxo = RocksDBUtxoIndex(self._db, settings=self.settings)

def enable_mempool_index(self) -> None:
from hathor.indexes.memory_mempool_tips_index import MemoryMempoolTipsIndex
if self.mempool_tips is None:
# XXX: use of RocksDBMempoolTipsIndex is very slow and was suspended
self.mempool_tips = MemoryMempoolTipsIndex(settings=self.settings)

def enable_nc_indexes(self) -> None:
from hathor.indexes.blueprint_timestamp_index import BlueprintTimestampIndex
from hathor.indexes.rocksdb_blueprint_history_index import RocksDBBlueprintHistoryIndex
Expand Down
3 changes: 0 additions & 3 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,6 @@ def _initialize_components(self) -> None:
self.wallet._manually_initialize()

self.tx_storage.pre_init()
assert self.tx_storage.indexes is not None

self._bit_signaling_service.start()

started_at = int(time.time())
Expand Down Expand Up @@ -517,7 +515,6 @@ def _verify_checkpoints(self) -> None:

This method needs the essential indexes to be already initialized.
"""
assert self.tx_storage.indexes is not None
# based on the current best-height, filter-out checkpoints that aren't expected to exist in the database
best_height = self.tx_storage.get_height_best_block()
expected_checkpoints = [cp for cp in self.checkpoints if cp.height <= best_height]
Expand Down
1 change: 0 additions & 1 deletion hathor/nanocontracts/resources/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ def render_GET(self, request: 'Request') -> bytes:
set_cors(request, 'GET')

tx_storage = self.manager.tx_storage
assert tx_storage.indexes is not None
if tx_storage.indexes.nc_history is None:
request.setResponseCode(503)
error_response = ErrorResponse(success=False, error='Nano contract history index not initialized')
Expand Down
1 change: 0 additions & 1 deletion hathor/nanocontracts/resources/nc_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ def __init__(self, manager: HathorManager) -> None:
super().__init__()
self.manager = manager
self.tx_storage = self.manager.tx_storage
assert self.tx_storage.indexes is not None
self.nc_creation_index = self.tx_storage.indexes.nc_creation
self.nc_history_index = self.tx_storage.indexes.nc_history
self.bp_history_index = self.tx_storage.indexes.blueprint_history
Expand Down
1 change: 0 additions & 1 deletion hathor/nanocontracts/resources/on_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ def render_GET(self, request: Request) -> bytes:
set_cors(request, 'GET')

tx_storage = self.manager.tx_storage
assert tx_storage.indexes is not None
if tx_storage.indexes.blueprints is None:
request.setResponseCode(503)
error_response = ErrorResponse(success=False, error='Blueprint index not initialized')
Expand Down
18 changes: 0 additions & 18 deletions hathor/nanocontracts/resources/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,6 @@ def render_GET(self, request: 'Request') -> bytes:

if params.block_height is not None:
# Get hash of the block with the height
if self.manager.tx_storage.indexes is None:
# No indexes enabled in the storage
request.setResponseCode(503)
error_response = ErrorResponse(
success=False,
error='No indexes enabled in the storage, so we can\'t filter by block height.'
)
return error_response.json_dumpb()

block_hash = self.manager.tx_storage.indexes.height.get(params.block_height)
if block_hash is None:
# No block hash was found with this height
Expand All @@ -103,15 +94,6 @@ def render_GET(self, request: 'Request') -> bytes:
)
return error_response.json_dumpb()
elif params.timestamp is not None:
if self.manager.tx_storage.indexes is None:
# No indexes enabled in the storage
request.setResponseCode(503)
error_response = ErrorResponse(
success=False,
error='No indexes enabled in the storage, so we can\'t filter by timestamp.'
)
return error_response.json_dumpb()

block_hashes, has_more = self.manager.tx_storage.indexes.sorted_blocks.get_older(
timestamp=params.timestamp,
hash_bytes=None,
Expand Down
5 changes: 0 additions & 5 deletions hathor/p2p/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,6 @@ def set_manager(self, manager: 'HathorManager') -> None:
raise TypeError('Class built incorrectly without any enabled sync version')

self.manager = manager
if self.is_sync_version_available(SyncVersion.V2):
assert self.manager.tx_storage.indexes is not None
indexes = self.manager.tx_storage.indexes
self.log.debug('enable sync-v2 indexes')
indexes.enable_mempool_index()

def add_listen_address_description(self, addr: str) -> None:
"""Add address to listen for incoming connections."""
Expand Down
6 changes: 0 additions & 6 deletions hathor/p2p/sync_v2/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,6 @@ def __init__(
def get_status(self) -> dict[str, Any]:
""" Return the status of the sync.
"""
assert self.tx_storage.indexes is not None
assert self.tx_storage.indexes.mempool_tips is not None
tips = self.tx_storage.indexes.mempool_tips.get()
tips_limited, tips_has_more = collect_n(iter(tips), MAX_MEMPOOL_STATUS_TIPS)
res = {
Expand Down Expand Up @@ -361,7 +359,6 @@ def run_sync_blocks(self) -> Generator[Any, Any, bool]:

Notice that we might already have all other peer's blocks while the other peer is still syncing.
"""
assert self.tx_storage.indexes is not None
self.state = PeerState.SYNCING_BLOCKS

# Get my best block.
Expand Down Expand Up @@ -462,8 +459,6 @@ def send_get_tips(self) -> None:
def handle_get_tips(self, _payload: str) -> None:
""" Handle a GET-TIPS message.
"""
assert self.tx_storage.indexes is not None
assert self.tx_storage.indexes.mempool_tips is not None
if self._is_streaming:
self.log.warn('can\'t send while streaming') # XXX: or can we?
self.send_message(ProtocolMessages.MEMPOOL_END)
Expand Down Expand Up @@ -641,7 +636,6 @@ def send_get_peer_block_hashes(self, heights: list[int]) -> None:
def handle_get_peer_block_hashes(self, payload: str) -> None:
""" Handle a GET-PEER-BLOCK-HASHES message.
"""
assert self.tx_storage.indexes is not None
heights = json.loads(payload)
if len(heights) > 20:
self.log.info('too many heights', heights_qty=len(heights))
Expand Down
2 changes: 0 additions & 2 deletions hathor/transaction/resources/block_at_height.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ def render_GET(self, request: 'Request') -> bytes:

:rtype: string (json)
"""
assert self.manager.tx_storage.indexes is not None

request.setHeader(b'content-type', b'application/json; charset=utf-8')
set_cors(request, 'GET')

Expand Down
8 changes: 1 addition & 7 deletions hathor/transaction/resources/mempool.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,8 @@ def render_GET(self, request: 'Request') -> bytes:
return json_dumpb(data)

def _get_from_index(self, index_source: IndexSource) -> Iterator[Transaction]:
tx_storage = self.manager.tx_storage
assert tx_storage.indexes is not None
if index_source == IndexSource.ANY or index_source == IndexSource.MEMPOOL:
# XXX: if source is ANY we try to use the mempool when possible
if tx_storage.indexes.mempool_tips is None:
raise ValueError('mempool index is not enabled')
# XXX: if source is ANY we try to use the mempool
yield from self._get_from_mempool_tips_index()
elif index_source == IndexSource.TX_TIPS:
raise ValueError('tx-tips index has been removed')
Expand All @@ -99,8 +95,6 @@ def _get_from_index(self, index_source: IndexSource) -> Iterator[Transaction]:

def _get_from_mempool_tips_index(self) -> Iterator[Transaction]:
tx_storage = self.manager.tx_storage
assert tx_storage.indexes is not None
assert tx_storage.indexes.mempool_tips is not None
yield from tx_storage.indexes.mempool_tips.iter_all(tx_storage)


Expand Down
2 changes: 0 additions & 2 deletions hathor/transaction/resources/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ def get_tx_extra_data(
Returns success, tx serializes, metadata and spent outputs
"""
assert tx.storage is not None
assert tx.storage.indexes is not None

settings = get_global_settings()
serialized = tx.to_json(decode_script=True)
serialized['raw'] = tx.get_struct().hex()
Expand Down
1 change: 0 additions & 1 deletion hathor/transaction/resources/utxo_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ def render_GET(self, request: 'Request') -> bytes:

# setup
tx_storage = self.manager.tx_storage
assert tx_storage.indexes is not None
if tx_storage.indexes.utxo is None:
request.setResponseCode(503)
return json_dumpb({'success': False})
Expand Down
2 changes: 1 addition & 1 deletion hathor/transaction/storage/rocksdb_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ def __init__(
*,
reactor: ReactorProtocol,
rocksdb_storage: RocksDBStorage,
indexes: Optional[IndexesManager] = None,
settings: 'HathorSettings',
vertex_parser: VertexParser,
nc_storage_factory: NCStorageFactory,
vertex_children_service: RocksDBVertexChildrenService,
indexes: IndexesManager,
cache_config: CacheConfig | None = None,
) -> None:
self._reactor = reactor
Expand Down
Loading
Loading