diff --git a/hathor/indexes/address_index.py b/hathor/indexes/address_index.py index ebf848ada..b8925b927 100644 --- a/hathor/indexes/address_index.py +++ b/hathor/indexes/address_index.py @@ -17,16 +17,17 @@ from structlog import get_logger -from hathor.indexes.base_index import BaseIndex +from hathor.indexes.tx_group_index import TxGroupIndex +from hathor.pubsub import HathorEvents from hathor.transaction import BaseTransaction if TYPE_CHECKING: # pragma: no cover - from hathor.pubsub import PubSubManager + from hathor.pubsub import EventArguments, PubSubManager logger = get_logger() -class AddressIndex(BaseIndex): +class AddressIndex(TxGroupIndex[str]): """ Index of inputs/outputs by address """ pubsub: Optional['PubSubManager'] @@ -34,7 +35,23 @@ class AddressIndex(BaseIndex): def init_loop_step(self, tx: BaseTransaction) -> None: self.add_tx(tx) - def publish_tx(self, tx: BaseTransaction, *, addresses: Optional[Iterable[str]] = None) -> None: + def _handle_tx_event(self, key: HathorEvents, args: 'EventArguments') -> None: + """ This method is called when pubsub publishes an event that we subscribed + """ + data = args.__dict__ + tx = data['tx'] + meta = tx.get_metadata() + if meta.has_voided_by_changed_since_last_call() or meta.has_spent_by_changed_since_last_call(): + self._publish_tx(tx) + + def _subscribe_pubsub_events(self) -> None: + """ Subscribe wallet index to receive voided/winner tx pubsub events + """ + assert self.pubsub is not None + # Subscribe to voided/winner events + self.pubsub.subscribe(HathorEvents.CONSENSUS_TX_UPDATE, self._handle_tx_event) + + def _publish_tx(self, tx: BaseTransaction, *, addresses: Optional[Iterable[str]] = None) -> None: """ Publish WALLET_ADDRESS_HISTORY for all addresses of a transaction. """ from hathor.pubsub import HathorEvents diff --git a/hathor/indexes/memory_address_index.py b/hathor/indexes/memory_address_index.py index f1cf3009b..3ab6b14f8 100644 --- a/hathor/indexes/memory_address_index.py +++ b/hathor/indexes/memory_address_index.py @@ -12,84 +12,45 @@ # See the License for the specific language governing permissions and # limitations under the License. -from collections import defaultdict -from typing import TYPE_CHECKING, DefaultDict, List, Optional, Set +from typing import TYPE_CHECKING, Iterable, List, Optional from structlog import get_logger from hathor.indexes.address_index import AddressIndex -from hathor.pubsub import HathorEvents +from hathor.indexes.memory_tx_group_index import MemoryTxGroupIndex from hathor.transaction import BaseTransaction if TYPE_CHECKING: # pragma: no cover - from hathor.pubsub import EventArguments, PubSubManager + from hathor.pubsub import PubSubManager logger = get_logger() -class MemoryAddressIndex(AddressIndex): +class MemoryAddressIndex(MemoryTxGroupIndex[str], AddressIndex): """ Index of inputs/outputs by address """ - index: DefaultDict[str, Set[bytes]] - def __init__(self, pubsub: Optional['PubSubManager'] = None) -> None: + super().__init__() self.pubsub = pubsub - self.force_clear() if self.pubsub: - self.subscribe_pubsub_events() + self._subscribe_pubsub_events() def get_db_name(self) -> Optional[str]: return None - def force_clear(self) -> None: - self.index = defaultdict(set) - - def subscribe_pubsub_events(self) -> None: - """ Subscribe wallet index to receive voided/winner tx pubsub events - """ - assert self.pubsub is not None - # Subscribe to voided/winner events - self.pubsub.subscribe(HathorEvents.CONSENSUS_TX_UPDATE, self.handle_tx_event) + def _extract_keys(self, tx: BaseTransaction) -> Iterable[str]: + return tx.get_related_addresses() def add_tx(self, tx: BaseTransaction) -> None: - """ Add tx inputs and outputs to the wallet index (indexed by its addresses). - """ - assert tx.hash is not None - - addresses = tx.get_related_addresses() - for address in addresses: - self.index[address].add(tx.hash) - - self.publish_tx(tx, addresses=addresses) - - def remove_tx(self, tx: BaseTransaction) -> None: - """ Remove tx inputs and outputs from the wallet index (indexed by its addresses). - """ - assert tx.hash is not None - - addresses = tx.get_related_addresses() - for address in addresses: - self.index[address].discard(tx.hash) - - def handle_tx_event(self, key: HathorEvents, args: 'EventArguments') -> None: - """ This method is called when pubsub publishes an event that we subscribed - """ - data = args.__dict__ - tx = data['tx'] - meta = tx.get_metadata() - if meta.has_voided_by_changed_since_last_call() or meta.has_spent_by_changed_since_last_call(): - self.publish_tx(tx) + super().add_tx(tx) + self._publish_tx(tx) def get_from_address(self, address: str) -> List[bytes]: - """ Get list of transaction hashes of an address - """ - return list(self.index[address]) + return list(self._get_from_key(address)) def get_sorted_from_address(self, address: str) -> List[bytes]: - """ Get a sorted list of transaction hashes of an address - """ - return sorted(self.index[address]) + return list(self._get_sorted_from_key(address)) def is_address_empty(self, address: str) -> bool: - return not bool(self.index[address]) + return self._is_key_empty(address) diff --git a/hathor/indexes/memory_tx_group_index.py b/hathor/indexes/memory_tx_group_index.py new file mode 100644 index 000000000..752e04762 --- /dev/null +++ b/hathor/indexes/memory_tx_group_index.py @@ -0,0 +1,69 @@ +# Copyright 2021 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import abstractmethod +from collections import defaultdict +from typing import DefaultDict, Iterable, Set, Sized, TypeVar + +from structlog import get_logger + +from hathor.indexes.tx_group_index import TxGroupIndex +from hathor.transaction import BaseTransaction +from hathor.util import not_none + +logger = get_logger() + +KT = TypeVar('KT', bound=Sized) + + +class MemoryTxGroupIndex(TxGroupIndex[KT]): + """Memory implementation of the TxGroupIndex. This class is abstract and cannot be used directly. + """ + + index: DefaultDict[KT, Set[bytes]] + + def __init__(self) -> None: + self.force_clear() + + def force_clear(self) -> None: + self.index = defaultdict(set) + + def _add_tx(self, key: KT, tx: BaseTransaction) -> None: + self.index[key].add(not_none(tx.hash)) + + @abstractmethod + def _extract_keys(self, tx: BaseTransaction) -> Iterable[KT]: + """Extract the keys related to a given tx. The transaction will be added to all extracted keys.""" + raise NotImplementedError + + def add_tx(self, tx: BaseTransaction) -> None: + assert tx.hash is not None + + for key in self._extract_keys(tx): + self._add_tx(key, tx) + + def remove_tx(self, tx: BaseTransaction) -> None: + assert tx.hash is not None + + for key in self._extract_keys(tx): + self.index[key].discard(tx.hash) + + def _get_from_key(self, key: KT) -> Iterable[bytes]: + yield from self.index[key] + + def _get_sorted_from_key(self, key: KT) -> Iterable[bytes]: + return sorted(self.index[key]) + + def _is_key_empty(self, key: KT) -> bool: + return not bool(self.index[key]) diff --git a/hathor/indexes/rocksdb_address_index.py b/hathor/indexes/rocksdb_address_index.py index 88ecf3357..74f978fa7 100644 --- a/hathor/indexes/rocksdb_address_index.py +++ b/hathor/indexes/rocksdb_address_index.py @@ -12,19 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple +from typing import TYPE_CHECKING, Iterable, List, Optional from structlog import get_logger from hathor.indexes.address_index import AddressIndex +from hathor.indexes.rocksdb_tx_group_index import RocksDBTxGroupIndex from hathor.indexes.rocksdb_utils import RocksDBIndexUtils -from hathor.pubsub import HathorEvents from hathor.transaction import BaseTransaction if TYPE_CHECKING: # pragma: no cover import rocksdb - from hathor.pubsub import EventArguments, PubSubManager + from hathor.pubsub import PubSubManager logger = get_logger() @@ -32,130 +32,42 @@ _DB_NAME: str = 'address' -class RocksDBAddressIndex(AddressIndex, RocksDBIndexUtils): +class RocksDBAddressIndex(RocksDBTxGroupIndex[str], AddressIndex, RocksDBIndexUtils): """ Index of inputs/outputs by address. + """ - This index uses rocksdb and the following key format: - - key = [address][tx.timestamp][tx.hash] - |--34b--||--4 bytes---||--32b--| - - It works nicely because rocksdb uses a tree sorted by key under the hood. + _KEY_SIZE = 34 - The timestamp must be serialized in big-endian, so ts1 > ts2 implies that bytes(ts1) > bytes(ts2), - hence the transactions are sorted by timestamp. - """ def __init__(self, db: 'rocksdb.DB', *, cf_name: Optional[bytes] = None, pubsub: Optional['PubSubManager'] = None) -> None: - self.log = logger.new() - RocksDBIndexUtils.__init__(self, db, cf_name or _CF_NAME_ADDRESS_INDEX) + RocksDBTxGroupIndex.__init__(self, db, cf_name or _CF_NAME_ADDRESS_INDEX) self.pubsub = pubsub if self.pubsub: - self.subscribe_pubsub_events() + self._subscribe_pubsub_events() + + def _serialize_key(self, key: str) -> bytes: + return key.encode('ascii') + + def _deserialize_key(self, key_bytes: bytes) -> str: + return key_bytes.decode('ascii') + + def _extract_keys(self, tx: BaseTransaction) -> Iterable[str]: + return tx.get_related_addresses() def get_db_name(self) -> Optional[str]: # XXX: we don't need it to be parametrizable, so this is fine return _DB_NAME - def force_clear(self) -> None: - self.clear() - - def _to_key(self, address: str, tx: Optional[BaseTransaction] = None) -> bytes: - import struct - assert len(address) == 34 - key = address.encode('ascii') - if tx: - assert tx.hash is not None - assert len(tx.hash) == 32 - key += struct.pack('>I', tx.timestamp) + tx.hash - assert len(key) == 34 + 4 + 32 - return key - - def _from_key(self, key: bytes) -> Tuple[str, int, bytes]: - import struct - assert len(key) == 34 + 4 + 32 - address = key[:34].decode('ascii') - timestamp: int - (timestamp,) = struct.unpack('>I', key[34:38]) - tx_hash = key[38:] - assert len(address) == 34 - assert len(tx_hash) == 32 - return address, timestamp, tx_hash - - def subscribe_pubsub_events(self) -> None: - """ Subscribe wallet index to receive voided/winner tx pubsub events - """ - assert self.pubsub is not None - # Subscribe to voided/winner events - self.pubsub.subscribe(HathorEvents.CONSENSUS_TX_UPDATE, self.handle_tx_event) - def add_tx(self, tx: BaseTransaction) -> None: - """ Add tx inputs and outputs to the wallet index (indexed by its addresses). - """ - assert tx.hash is not None - - addresses = tx.get_related_addresses() - for address in addresses: - self.log.debug('put address', address=address) - self._db.put((self._cf, self._to_key(address, tx)), b'') - - self.publish_tx(tx, addresses=addresses) - - def remove_tx(self, tx: BaseTransaction) -> None: - """ Remove tx inputs and outputs from the wallet index (indexed by its addresses). - """ - assert tx.hash is not None - - addresses = tx.get_related_addresses() - for address in addresses: - self.log.debug('delete address', address=address) - self._db.delete((self._cf, self._to_key(address, tx))) - - def handle_tx_event(self, key: HathorEvents, args: 'EventArguments') -> None: - """ This method is called when pubsub publishes an event that we subscribed - """ - data = args.__dict__ - tx = data['tx'] - meta = tx.get_metadata() - if meta.has_voided_by_changed_since_last_call() or meta.has_spent_by_changed_since_last_call(): - self.publish_tx(tx) - - def _get_from_address_iter(self, address: str) -> Iterable[bytes]: - self.log.debug('seek to', address=address) - it = self._db.iterkeys(self._cf) - it.seek(self._to_key(address)) - for _cf, key in it: - addr, _, tx_hash = self._from_key(key) - if addr != address: - break - self.log.debug('seek found', tx=tx_hash.hex()) - yield tx_hash - self.log.debug('seek end') + super().add_tx(tx) + self._publish_tx(tx) def get_from_address(self, address: str) -> List[bytes]: - """ Get list of transaction hashes of an address - """ - return list(self._get_from_address_iter(address)) + return list(self._get_from_key(address)) def get_sorted_from_address(self, address: str) -> List[bytes]: - """ Get a sorted list of transaction hashes of an address - """ - return list(self._get_from_address_iter(address)) + return list(self._get_sorted_from_key(address)) def is_address_empty(self, address: str) -> bool: - self.log.debug('seek to', address=address) - it = self._db.iterkeys(self._cf) - seek_key = self._to_key(address) - it.seek(seek_key) - cf_key = it.get() - if not cf_key: - return True - _cf, key = cf_key - # XXX: this means we reached the end it did not found any key - if key == seek_key: - return True - addr, _, _ = self._from_key(key) - is_empty = addr != address - self.log.debug('seek empty', is_empty=is_empty) - return is_empty + return self._is_key_empty(address) diff --git a/hathor/indexes/rocksdb_tx_group_index.py b/hathor/indexes/rocksdb_tx_group_index.py new file mode 100644 index 000000000..1706eb3b1 --- /dev/null +++ b/hathor/indexes/rocksdb_tx_group_index.py @@ -0,0 +1,140 @@ +# Copyright 2021 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import abstractmethod +from typing import TYPE_CHECKING, Iterable, Optional, Sized, Tuple, TypeVar + +from structlog import get_logger + +from hathor.indexes.rocksdb_utils import RocksDBIndexUtils +from hathor.indexes.tx_group_index import TxGroupIndex +from hathor.transaction import BaseTransaction + +if TYPE_CHECKING: # pragma: no cover + import rocksdb + +logger = get_logger() + +KT = TypeVar('KT', bound=Sized) + + +class RocksDBTxGroupIndex(TxGroupIndex[KT], RocksDBIndexUtils): + """RocksDB implementation of the TxGroupIndex. This class is abstract and cannot be used directly. + + Current implementation requires all keys to have the same size after serialization. + + This index uses rocksdb and the following key format: + + rocksdb_key = [key ][tx.timestamp][tx.hash] + |_KEY_SIZE||--4 bytes---||--32b--| + + It works nicely because rocksdb uses a tree sorted by key under the hood. + + The timestamp must be serialized in big-endian, so ts1 > ts2 implies that bytes(ts1) > bytes(ts2), + hence the transactions are sorted by timestamp. + """ + + _KEY_SIZE: int + _CF_NAME: bytes + + def __init__(self, db: 'rocksdb.DB', cf_name: bytes) -> None: + self.log = logger.new() + RocksDBIndexUtils.__init__(self, db, cf_name) + + def force_clear(self) -> None: + self.clear() + + @abstractmethod + def _serialize_key(self, key: KT) -> bytes: + """Serialize key, so it can be part of RockDB's key.""" + raise NotImplementedError + + @abstractmethod + def _deserialize_key(self, _bytes: bytes) -> KT: + """Deserialize RocksDB's key.""" + raise NotImplementedError + + @abstractmethod + def _extract_keys(self, tx: BaseTransaction) -> Iterable[KT]: + """Extract the keys related to a given tx. The transaction will be added to all extracted keys.""" + raise NotImplementedError + + def _to_rocksdb_key(self, key: KT, tx: Optional[BaseTransaction] = None) -> bytes: + import struct + rocksdb_key = self._serialize_key(key) + assert len(rocksdb_key) == self._KEY_SIZE + if tx: + assert tx.hash is not None + assert len(tx.hash) == 32 + rocksdb_key += struct.pack('>I', tx.timestamp) + tx.hash + assert len(rocksdb_key) == self._KEY_SIZE + 4 + 32 + return rocksdb_key + + def _from_rocksdb_key(self, rocksdb_key: bytes) -> Tuple[KT, int, bytes]: + import struct + assert len(rocksdb_key) == self._KEY_SIZE + 4 + 32 + key = self._deserialize_key(rocksdb_key[:self._KEY_SIZE]) + timestamp: int + (timestamp,) = struct.unpack('>I', rocksdb_key[self._KEY_SIZE:self._KEY_SIZE + 4]) + tx_hash = rocksdb_key[self._KEY_SIZE + 4:] + # Should we differentiate `_KEY_SIZE` and `_SERIALIZED_KEY_SIZE`? + # assert len(key) == self._KEY_SIZE + assert len(tx_hash) == 32 + return key, timestamp, tx_hash + + def add_tx(self, tx: BaseTransaction) -> None: + assert tx.hash is not None + + for key in self._extract_keys(tx): + self.log.debug('put key', key=key) + self._db.put((self._cf, self._to_rocksdb_key(key, tx)), b'') + + def remove_tx(self, tx: BaseTransaction) -> None: + assert tx.hash is not None + + for key in self._extract_keys(tx): + self.log.debug('delete key', key=key) + self._db.delete((self._cf, self._to_rocksdb_key(key, tx))) + + def _get_from_key(self, key: KT) -> Iterable[bytes]: + self.log.debug('seek to', key=key) + it = self._db.iterkeys(self._cf) + it.seek(self._to_rocksdb_key(key)) + for _cf, rocksdb_key in it: + key2, _, tx_hash = self._from_rocksdb_key(rocksdb_key) + if key2 != key: + break + self.log.debug('seek found', tx=tx_hash.hex()) + yield tx_hash + self.log.debug('seek end') + + def _get_sorted_from_key(self, key: KT) -> Iterable[bytes]: + return self._get_from_key(key) + + def _is_key_empty(self, key: KT) -> bool: + self.log.debug('seek to', key=key) + it = self._db.iterkeys(self._cf) + seek_key = self._to_rocksdb_key(key) + it.seek(seek_key) + cf_key = it.get() + if not cf_key: + return True + _cf, rocksdb_key = cf_key + # XXX: this means we reached the end it did not found any key + if rocksdb_key == seek_key: + return True + key2, _, _ = self._from_rocksdb_key(rocksdb_key) + is_empty = key2 != key + self.log.debug('seek empty', is_empty=is_empty) + return is_empty diff --git a/hathor/indexes/tx_group_index.py b/hathor/indexes/tx_group_index.py new file mode 100644 index 000000000..4041917f5 --- /dev/null +++ b/hathor/indexes/tx_group_index.py @@ -0,0 +1,59 @@ +# Copyright 2021 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import abstractmethod +from typing import Generic, Iterable, Sized, TypeVar + +from structlog import get_logger + +from hathor.indexes.base_index import BaseIndex +from hathor.transaction import BaseTransaction + +logger = get_logger() + +KT = TypeVar('KT', bound=Sized) + + +class TxGroupIndex(BaseIndex, Generic[KT]): + """This is an abstract index to easily group transactions by key. Each transaction might belong to + more than one group. For example, when grouped by addresses, one transaction with five different + addresses would be added to five groups. + + Implementations using this index must extract a list of keys from each transaction. + """ + + @abstractmethod + def add_tx(self, tx: BaseTransaction) -> None: + """Add tx to this index.""" + raise NotImplementedError + + @abstractmethod + def remove_tx(self, tx: BaseTransaction) -> None: + """Remove tx from this index.""" + raise NotImplementedError + + @abstractmethod + def _get_from_key(self, key: KT) -> Iterable[bytes]: + """Get all transactions that have a given key.""" + raise NotImplementedError + + @abstractmethod + def _get_sorted_from_key(self, key: KT) -> Iterable[bytes]: + """Get all transactions that have a given key, sorted by timestamp.""" + raise NotImplementedError + + @abstractmethod + def _is_key_empty(self, key: KT) -> bool: + """Check whether a key is empty.""" + raise NotImplementedError