diff --git a/Makefile b/Makefile index 65c2b2032..ea5ba1963 100644 --- a/Makefile +++ b/Makefile @@ -82,7 +82,9 @@ isort: proto_dir = ./hathor/protos proto_srcs = $(wildcard $(proto_dir)/*.proto) -proto_outputs = $(patsubst %.proto,%_pb2.py,$(proto_srcs)) $(patsubst %.proto,%_pb2_grpc.py,$(proto_srcs)) $(patsubst %.proto,%_pb2.pyi,$(proto_srcs)) +proto_srcs_old = $(proto_dir)/transaction_storage.proto +proto_outputs = $(patsubst %.proto,%_pb2.py,$(proto_srcs)) $(patsubst %.proto,%_pb2.pyi,$(proto_srcs)) +proto_outputs_old = $(patsubst %.proto,%_pb2_grpc.py,$(proto_srcs) $(proto_srcs_old)) GRPC_TOOLS_VERSION = "$(shell python -m grpc_tools.protoc --version 2>/dev/null || true)" #ifdef GRPC_TOOLS_VERSION ifneq ($(GRPC_TOOLS_VERSION),"") @@ -94,8 +96,6 @@ endif # all proto_srcs are added as deps so when a change on any of them triggers all to be rebuilt %_pb2.pyi %_pb2.py: %.proto $(proto_srcs) $(protoc) -I. --python_out=. --mypy_out=. $< -%_pb2_grpc.py: %.proto $(proto_srcs) - $(protoc) -I. --grpc_python_out=. $< || true .PHONY: protos protos: $(proto_outputs) @@ -104,7 +104,7 @@ protos: $(proto_outputs) .PHONY: clean-protos clean-protos: - rm -f $(proto_outputs) + rm -f $(proto_outputs) $(proto_outputs_old) .PHONY: clean-pyc clean-pyc: diff --git a/hathor/protos/__init__.py b/hathor/protos/__init__.py index b6f58c343..b0a453ad1 100644 --- a/hathor/protos/__init__.py +++ b/hathor/protos/__init__.py @@ -22,56 +22,6 @@ TxInput, TxOutput, ) -from hathor.protos.transaction_storage_pb2 import ( - ANY_ORDER, - ANY_TYPE, - ASC_ORDER, - BLOCK_TYPE, - FOR_CACHING, - LEFT_RIGHT_ORDER_CHILDREN, - LEFT_RIGHT_ORDER_SPENT, - NO_FILTER, - ONLY_NEWER, - ONLY_OLDER, - TOPOLOGICAL_ORDER, - TRANSACTION_TYPE, - AddValueRequest, - CountRequest, - CountResponse, - Empty, - ExistsRequest, - ExistsResponse, - FirstTimestampRequest, - FirstTimestampResponse, - GetRequest, - GetResponse, - GetValueRequest, - GetValueResponse, - Interval, - LatestTimestampRequest, - LatestTimestampResponse, - ListItemResponse, - ListNewestRequest, - ListRequest, - ListTipsRequest, - MarkAsRequest, - MarkAsResponse, - RemoveRequest, - RemoveResponse, - RemoveValueRequest, - SaveRequest, - SaveResponse, - SortedTxsRequest, -) - -try: - from hathor.protos.transaction_storage_pb2_grpc import ( - TransactionStorageServicer, - TransactionStorageStub, - add_TransactionStorageServicer_to_server, - ) -except ImportError: - pass __all__ = [ 'BaseTransaction', @@ -81,50 +31,6 @@ 'TxOutput', 'BitcoinAuxPow', 'Metadata', - 'ExistsRequest', - 'ExistsResponse', - 'GetRequest', - 'GetResponse', - 'SaveRequest', - 'SaveResponse', - 'RemoveRequest', - 'RemoveResponse', - 'CountRequest', - 'CountResponse', - 'LatestTimestampRequest', - 'LatestTimestampResponse', - 'AddValueRequest', - 'GetValueRequest', - 'GetValueResponse', - 'RemoveValueRequest', - 'Empty', - 'FirstTimestampRequest', - 'FirstTimestampResponse', - 'MarkAsRequest', - 'MarkAsResponse', - 'ListRequest', - 'ListTipsRequest', - 'ListNewestRequest', - 'ListItemResponse', - 'Interval', - 'SortedTxsRequest', 'TokenCreationTransaction', - 'TransactionStorageStub', - 'TransactionStorageServicer', - 'ANY_TYPE', - 'TRANSACTION_TYPE', - 'BLOCK_TYPE', - 'NO_FILTER', - 'ONLY_NEWER', - 'ONLY_OLDER', - 'ANY_ORDER', - 'ASC_ORDER', - 'TOPOLOGICAL_ORDER', - 'ONLY_NEWER', - 'ONLY_OLDER', - 'FOR_CACHING', - 'LEFT_RIGHT_ORDER_CHILDREN', - 'LEFT_RIGHT_ORDER_SPENT', 'VOIDED', - 'add_TransactionStorageServicer_to_server', ] diff --git a/hathor/protos/transaction_storage.proto b/hathor/protos/transaction_storage.proto deleted file mode 100644 index b22f44768..000000000 --- a/hathor/protos/transaction_storage.proto +++ /dev/null @@ -1,180 +0,0 @@ -syntax = "proto3"; - -package hathor; - -import "hathor/protos/transaction.proto"; - -service TransactionStorage { - rpc Exists(ExistsRequest) returns (ExistsResponse) {} - rpc Get(GetRequest) returns (GetResponse) {} - rpc Save(SaveRequest) returns (SaveResponse) {} - rpc Remove(RemoveRequest) returns (RemoveResponse) {} - rpc Count(CountRequest) returns (CountResponse) {} - rpc LatestTimestamp(LatestTimestampRequest) returns (LatestTimestampResponse) {} - rpc FirstTimestamp(FirstTimestampRequest) returns (FirstTimestampResponse) {} - rpc MarkAs(MarkAsRequest) returns (MarkAsResponse) {} - rpc List(ListRequest) returns (stream ListItemResponse) {} - rpc ListTips(ListTipsRequest) returns (stream Interval) {} - rpc ListNewest(ListNewestRequest) returns (stream ListItemResponse) {} - rpc SortedTxs(SortedTxsRequest) returns (stream Transaction) {} - rpc AddValue(AddValueRequest) returns (Empty) {} - rpc RemoveValue(RemoveValueRequest) returns (Empty) {} - rpc GetValue(GetValueRequest) returns (GetValueResponse) {} -} - -message ExistsRequest { - bytes hash = 1; -} - -message ExistsResponse { - bool exists = 1; -} - -message GetRequest { - bytes hash = 1; - bool exclude_metadata = 2; -} - -message GetResponse { - BaseTransaction transaction = 1; -} - -message SaveRequest { - BaseTransaction transaction = 1; - bool only_metadata = 2; -} - -message SaveResponse { - bool saved = 1; -} - -message RemoveRequest { - BaseTransaction transaction = 1; -} - -message RemoveResponse { - bool removed = 1; -} - -enum TxType { - ANY_TYPE = 0; - TRANSACTION_TYPE = 1; - BLOCK_TYPE = 2; -} - -message CountRequest { - TxType tx_type = 1; -} - -message CountResponse { - uint64 count = 1; -} - -message LatestTimestampRequest { -} - -message LatestTimestampResponse { - uint32 timestamp = 1; -} - -message FirstTimestampRequest { -} - -message FirstTimestampResponse { - uint32 timestamp = 1; -} - -enum MarkType { - FOR_CACHING = 0; -} - -message MarkAsRequest { - BaseTransaction transaction = 1; - MarkType mark_type = 2; - bool remove_mark = 3; - bool relax_assert = 4; -} - -message MarkAsResponse { - bool marked = 1; -} - -enum TimeFilterType { - NO_FILTER = 0; - ONLY_NEWER = 1; - ONLY_OLDER = 2; -} - -enum OrderBy { - ANY_ORDER = 0; - ASC_ORDER = 1; - TOPOLOGICAL_ORDER = 2; - LEFT_RIGHT_ORDER_CHILDREN = 3; - LEFT_RIGHT_ORDER_SPENT = 4; -} - -message ListRequest { - bool exclude_metadata = 1; - TxType tx_type = 2; - TimeFilterType time_filter = 3; - uint32 timestamp = 4; - OrderBy order_by = 5; - bool filter_before = 6; - oneof tx_oneof { - bytes hash = 7; - BaseTransaction tx = 9; - } - uint64 max_count = 8; -} - -message ListTipsRequest { - // optional timestamp, `oneof` used to differentiate unset (None) from 0 - oneof timestamp_oneof { - double timestamp = 1; - } - TxType tx_type = 2; -} - -message Interval { - double begin = 1; - double end = 2; - bytes data = 3; -} - -message ListNewestRequest { - uint64 count = 1; - TxType tx_type = 2; -} - -message ListItemResponse { - oneof list_item_oneof { - BaseTransaction transaction = 1; - bool has_more = 2; - } -} - -message SortedTxsRequest { - uint32 timestamp = 1; - uint32 count = 2; - uint32 offset = 3; -} - -message AddValueRequest { - string key = 1; - string value = 2; -} - -message RemoveValueRequest { - string key = 1; -} - -message GetValueRequest { - string key = 1; -} - -message GetValueResponse { - string value = 1; -} - -message Empty { -} diff --git a/hathor/transaction/storage/__init__.py b/hathor/transaction/storage/__init__.py index 255453592..3be3ee06b 100644 --- a/hathor/transaction/storage/__init__.py +++ b/hathor/transaction/storage/__init__.py @@ -18,12 +18,6 @@ from hathor.transaction.storage.memory_storage import TransactionMemoryStorage from hathor.transaction.storage.transaction_storage import TransactionStorage -try: - from hathor.transaction.storage.remote_storage import TransactionRemoteStorage, create_transaction_storage_server - from hathor.transaction.storage.subprocess_storage import TransactionSubprocessStorage -except ImportError: - pass - try: from hathor.transaction.storage.rocksdb_storage import TransactionRocksDBStorage except ImportError: @@ -35,8 +29,5 @@ 'TransactionCompactStorage', 'TransactionCacheStorage', 'TransactionBinaryStorage', - 'TransactionSubprocessStorage', - 'TransactionRemoteStorage', 'TransactionRocksDBStorage', - 'create_transaction_storage_server', ] diff --git a/hathor/transaction/storage/binary_storage.py b/hathor/transaction/storage/binary_storage.py index 5a63d7f21..c8a801e80 100644 --- a/hathor/transaction/storage/binary_storage.py +++ b/hathor/transaction/storage/binary_storage.py @@ -23,14 +23,14 @@ TransactionDoesNotExist, TransactionMetadataDoesNotExist, ) -from hathor.transaction.storage.transaction_storage import BaseTransactionStorage, TransactionStorageAsyncFromSync +from hathor.transaction.storage.transaction_storage import BaseTransactionStorage from hathor.transaction.transaction_metadata import TransactionMetadata if TYPE_CHECKING: from hathor.transaction import BaseTransaction -class TransactionBinaryStorage(BaseTransactionStorage, TransactionStorageAsyncFromSync): +class TransactionBinaryStorage(BaseTransactionStorage): def __init__(self, path='./', with_index=True): self.tx_path = os.path.join(path, 'tx') os.makedirs(self.tx_path, exist_ok=True) diff --git a/hathor/transaction/storage/cache_storage.py b/hathor/transaction/storage/cache_storage.py index a5067e739..852c8061a 100644 --- a/hathor/transaction/storage/cache_storage.py +++ b/hathor/transaction/storage/cache_storage.py @@ -13,10 +13,9 @@ # limitations under the License. from collections import OrderedDict -from typing import TYPE_CHECKING, Any, Generator, Iterator, Optional, Set +from typing import TYPE_CHECKING, Any, Optional, Set from twisted.internet import threads -from twisted.internet.defer import Deferred, inlineCallbacks, succeed from hathor.transaction import BaseTransaction from hathor.transaction.storage.transaction_storage import BaseTransactionStorage @@ -191,56 +190,6 @@ def get_count_tx_blocks(self) -> int: self._flush_to_storage(self.dirty_txs.copy()) return self.store.get_count_tx_blocks() - @inlineCallbacks - def save_transaction_deferred(self, tx: BaseTransaction, *, only_metadata: bool = False) -> Iterator[Deferred]: - # TODO: yield self._save_transaction_deferred - self._save_transaction(tx) - - # call super which adds to index if needed - yield super().save_transaction_deferred(tx) - - @inlineCallbacks - def remove_transaction_deferred(self, tx: BaseTransaction) -> Iterator[Deferred]: - yield super().remove_transaction_deferred(tx) - - def transaction_exists_deferred(self, hash_bytes: bytes) -> Deferred: - if hash_bytes in self.cache: - return succeed(True) - return self.store.transaction_exists_deferred(hash_bytes) - - @inlineCallbacks - def get_transaction_deferred(self, hash_bytes: bytes) -> Generator[Deferred, Any, BaseTransaction]: - if hash_bytes in self.cache: - tx = self._clone(self.cache[hash_bytes]) - self.cache.move_to_end(hash_bytes, last=True) - self.stats['hit'] += 1 - return tx - else: - tx = yield self.store.get_transaction_deferred(hash_bytes) - # TODO: yield self._update_cache_deferred(tx) - self._update_cache(tx) - self.stats['miss'] += 1 - return tx - - @inlineCallbacks - def get_all_transactions_deferred(self): - # TODO: yield self._flush_to_storage_deferred(self.dirty_txs.copy()) - self._flush_to_storage(self.dirty_txs.copy()) - all_transactions = yield self.store.get_all_transactions_deferred() - - def _mygenerator(): - for tx in all_transactions: - tx.storage = self - yield tx - return _mygenerator() - - @inlineCallbacks - def get_count_tx_blocks_deferred(self): - # TODO: yield self._flush_to_storage_deferred(self.dirty_txs.copy()) - self._flush_to_storage(self.dirty_txs.copy()) - res = yield self.store.get_count_tx_blocks_deferred() - return res - def add_value(self, key: str, value: str) -> None: self.store.add_value(key, value) diff --git a/hathor/transaction/storage/compact_storage.py b/hathor/transaction/storage/compact_storage.py index 63254f396..e97dfc495 100644 --- a/hathor/transaction/storage/compact_storage.py +++ b/hathor/transaction/storage/compact_storage.py @@ -21,7 +21,7 @@ from hathor.conf import HathorSettings from hathor.transaction.storage.exceptions import TransactionDoesNotExist -from hathor.transaction.storage.transaction_storage import BaseTransactionStorage, TransactionStorageAsyncFromSync +from hathor.transaction.storage.transaction_storage import BaseTransactionStorage from hathor.transaction.transaction_metadata import TransactionMetadata if TYPE_CHECKING: @@ -30,7 +30,7 @@ settings = HathorSettings() -class TransactionCompactStorage(BaseTransactionStorage, TransactionStorageAsyncFromSync): +class TransactionCompactStorage(BaseTransactionStorage): """This storage saves tx and metadata in the same file. It also uses JSON format. Saved file is of format {'tx': {...}, 'meta': {...}} diff --git a/hathor/transaction/storage/memory_storage.py b/hathor/transaction/storage/memory_storage.py index 13c1477b2..5a4f350be 100644 --- a/hathor/transaction/storage/memory_storage.py +++ b/hathor/transaction/storage/memory_storage.py @@ -15,14 +15,14 @@ from typing import Any, Dict, Iterator, Optional, TypeVar from hathor.transaction.storage.exceptions import TransactionDoesNotExist -from hathor.transaction.storage.transaction_storage import BaseTransactionStorage, TransactionStorageAsyncFromSync +from hathor.transaction.storage.transaction_storage import BaseTransactionStorage from hathor.transaction.transaction import BaseTransaction from hathor.transaction.transaction_metadata import TransactionMetadata _Clonable = TypeVar('_Clonable', BaseTransaction, TransactionMetadata) -class TransactionMemoryStorage(BaseTransactionStorage, TransactionStorageAsyncFromSync): +class TransactionMemoryStorage(BaseTransactionStorage): def __init__(self, with_index: bool = True, *, _clone_if_needed: bool = False) -> None: """ :param _clone_if_needed: *private parameter*, defaults to True, controls whether to clone diff --git a/hathor/transaction/storage/remote_storage.py b/hathor/transaction/storage/remote_storage.py deleted file mode 100644 index 780ce4486..000000000 --- a/hathor/transaction/storage/remote_storage.py +++ /dev/null @@ -1,891 +0,0 @@ -# Copyright 2021 Hathor Labs -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from math import inf -from typing import TYPE_CHECKING, Any, Callable, Dict, Generator, Iterator, List, Optional, Set, Tuple, Union - -import grpc -from grpc._server import _Context -from intervaltree import Interval -from structlog import get_logger -from twisted.internet.defer import Deferred, inlineCallbacks - -from hathor import protos -from hathor.exception import HathorError -from hathor.indexes import TransactionIndexElement, TransactionsIndex -from hathor.transaction import Block -from hathor.transaction.storage.exceptions import TransactionDoesNotExist -from hathor.transaction.storage.transaction_storage import AllTipsCache, TransactionStorage - -if TYPE_CHECKING: - from hathor.transaction import BaseTransaction # noqa: F401 - -logger = get_logger() - - -class RemoteCommunicationError(HathorError): - pass - - -def convert_grpc_exceptions(func: Callable) -> Callable: - """Decorator to catch and conver grpc exceptions for hathor expections. - """ - from functools import wraps - - @wraps(func) - def wrapper(*args, **kwargs): - try: - return func(*args, **kwargs) - except grpc.RpcError as e: - if e.code() == grpc.StatusCode.NOT_FOUND: - raise TransactionDoesNotExist - else: - raise RemoteCommunicationError from e - - return wrapper - - -def convert_grpc_exceptions_generator(func: Callable) -> Callable: - """Decorator to catch and conver grpc excpetions for hathor expections. (for generators) - """ - from functools import wraps - - @wraps(func) - def wrapper(*args, **kwargs): - try: - yield from func(*args, **kwargs) - except grpc.RpcError as e: - if e.code() == grpc.StatusCode.NOT_FOUND: - raise TransactionDoesNotExist - else: - raise RemoteCommunicationError from e - - return wrapper - - -def convert_hathor_exceptions(func: Callable) -> Callable: - """Decorator to annotate better details and codes on the grpc context for known exceptions. - """ - from functools import wraps - - @wraps(func) - def wrapper(self: Any, request: Any, context: _Context) -> Any: - try: - return func(self, request, context) - except TransactionDoesNotExist: - context.set_code(grpc.StatusCode.NOT_FOUND) - context.set_details('Transaction does not exist.') - raise - - return wrapper - - -def convert_hathor_exceptions_generator(func: Callable) -> Callable: - """Decorator to annotate better details and codes on the grpc context for known exceptions. (for generators) - """ - from functools import wraps - - @wraps(func) - def wrapper(self: Any, request: Any, context: _Context) -> Iterator: - try: - yield from func(self, request, context) - except TransactionDoesNotExist: - context.set_code(grpc.StatusCode.NOT_FOUND) - context.set_details('Transaction does not exist.') - raise - - return wrapper - - -class TransactionRemoteStorage(TransactionStorage): - """Connects to a Storage API Server at given port and exposes standard storage interface. - """ - - def __init__(self, with_index=None): - super().__init__() - self._channel = None - self.with_index = with_index - # Set initial value for _best_block_tips cache. - self._best_block_tips = [] - - def connect_to(self, port: int) -> None: - if self._channel: - self._channel.close() - self._channel = grpc.insecure_channel('127.0.0.1:{}'.format(port)) - self._stub = protos.TransactionStorageStub(self._channel) - - # Initialize genesis. - self._save_or_verify_genesis() - - # Set initial value for _best_block_tips cache. - self._best_block_tips = [x.hash for x in self.get_all_genesis() if x.is_block] - - def _check_connection(self) -> None: - """raise error if not connected""" - from .subprocess_storage import SubprocessNotAliveError - if not self._channel: - raise SubprocessNotAliveError('subprocess not started') - - # TransactionStorageSync interface implementation: - - @convert_grpc_exceptions - def remove_transaction(self, tx: 'BaseTransaction') -> None: - self._check_connection() - - tx_proto = tx.to_proto() - request = protos.RemoveRequest(transaction=tx_proto) - result = self._stub.Remove(request) # noqa: F841 - assert result.removed - self._remove_from_weakref(tx) - - @convert_grpc_exceptions - def save_transaction(self, tx: 'BaseTransaction', *, only_metadata: bool = False) -> None: - self._check_connection() - - tx_proto = tx.to_proto() - request = protos.SaveRequest(transaction=tx_proto, only_metadata=only_metadata) - result = self._stub.Save(request) # noqa: F841 - assert result.saved - self._save_to_weakref(tx) - - @convert_grpc_exceptions - def transaction_exists(self, hash_bytes: bytes) -> bool: - self._check_connection() - request = protos.ExistsRequest(hash=hash_bytes) - result = self._stub.Exists(request) - return result.exists - - @convert_grpc_exceptions - def _get_transaction(self, hash_bytes: bytes) -> 'BaseTransaction': - tx = self.get_transaction_from_weakref(hash_bytes) - if tx is not None: - return tx - - from hathor.transaction import tx_or_block_from_proto - self._check_connection() - request = protos.GetRequest(hash=hash_bytes) - result = self._stub.Get(request) - - tx = tx_or_block_from_proto(result.transaction, storage=self) - self._save_to_weakref(tx) - return tx - - @convert_grpc_exceptions_generator - def get_all_transactions(self) -> Iterator['BaseTransaction']: - yield from self._call_list_request_generators() - - @convert_grpc_exceptions - def get_count_tx_blocks(self) -> int: - self._check_connection() - request = protos.CountRequest(tx_type=protos.ANY_TYPE) - result = self._stub.Count(request) - return result.count - - # TransactionStorageAsync interface implementation: - - @convert_grpc_exceptions - def save_transaction_deferred(self, tx: 'BaseTransaction', *, only_metadata: bool = False) -> None: - # self._check_connection() - raise NotImplementedError - - @convert_grpc_exceptions - def remove_transaction_deferred(self, tx: 'BaseTransaction') -> None: - # self._check_connection() - raise NotImplementedError - - @inlineCallbacks - @convert_grpc_exceptions_generator - def transaction_exists_deferred(self, hash_bytes: bytes) -> Generator[None, protos.ExistsResponse, bool]: - self._check_connection() - request = protos.ExistsRequest(hash=hash_bytes) - result = yield Deferred.fromFuture(self._stub.Exists.future(request)) - return result.exists - - @convert_grpc_exceptions - def get_transaction_deferred(self, hash_bytes: bytes) -> Deferred: - # self._check_connection() - raise NotImplementedError - - @convert_grpc_exceptions - def get_all_transactions_deferred(self) -> Deferred: - # self._check_connection() - raise NotImplementedError - - @convert_grpc_exceptions - def get_count_tx_blocks_deferred(self) -> Deferred: - # self._check_connection() - raise NotImplementedError - - # TransactionStorage interface implementation: - - @property - def latest_timestamp(self) -> int: - return self._latest_timestamp() - - @convert_grpc_exceptions - def _latest_timestamp(self) -> int: - self._check_connection() - request = protos.LatestTimestampRequest() - result = self._stub.LatestTimestamp(request) - return result.timestamp - - @property - def first_timestamp(self) -> int: - if not hasattr(self, '_cached_first_timestamp'): - timestamp = self._first_timestamp() - if timestamp: - setattr(self, '_cached_first_timestamp', timestamp) - return getattr(self, '_cached_first_timestamp', None) - - @convert_grpc_exceptions - def _first_timestamp(self) -> int: - self._check_connection() - request = protos.FirstTimestampRequest() - result = self._stub.FirstTimestamp(request) - return result.timestamp - - def get_best_block_tips(self, timestamp: Optional[float] = None, *, skip_cache: bool = False) -> List[bytes]: - return super().get_best_block_tips(timestamp, skip_cache=skip_cache) - - @convert_grpc_exceptions - def get_all_tips(self, timestamp: Optional[Union[int, float]] = None) -> Set[Interval]: - self._check_connection() - if isinstance(timestamp, float) and timestamp != inf: - self.log.warn('timestamp given in float will be truncated, use int instead') - timestamp = int(timestamp) - - if self._all_tips_cache is not None and timestamp is not None and timestamp >= self._all_tips_cache.timestamp: - return self._all_tips_cache.tips - - request = protos.ListTipsRequest(tx_type=protos.ANY_TYPE, timestamp=timestamp) - result = self._stub.ListTips(request) - tips = set() - for interval_proto in result: - tips.add(Interval(interval_proto.begin, interval_proto.end, interval_proto.data)) - - if timestamp is not None and timestamp >= self.latest_timestamp: - merkle_tree, hashes = self.calculate_merkle_tree(tips) - self._all_tips_cache = AllTipsCache(self.latest_timestamp, tips, merkle_tree, hashes) - - return tips - - @convert_grpc_exceptions - def get_block_tips(self, timestamp: Optional[float] = None) -> Set[Interval]: - self._check_connection() - if isinstance(timestamp, float) and timestamp != inf: - self.log.warn('timestamp given in float will be truncated, use int instead') - timestamp = int(timestamp) - request = protos.ListTipsRequest(tx_type=protos.BLOCK_TYPE, timestamp=timestamp) - result = self._stub.ListTips(request) - tips = set() - for interval_proto in result: - tips.add(Interval(interval_proto.begin, interval_proto.end, interval_proto.data)) - return tips - - @convert_grpc_exceptions - def get_tx_tips(self, timestamp: Optional[float] = None) -> Set[Interval]: - self._check_connection() - if isinstance(timestamp, float) and timestamp != inf: - self.log.warn('timestamp given in float will be truncated, use int instead') - timestamp = int(timestamp) - request = protos.ListTipsRequest(tx_type=protos.TRANSACTION_TYPE, timestamp=timestamp) - result = self._stub.ListTips(request) - tips = set() - for interval_proto in result: - tips.add(Interval(interval_proto.begin, interval_proto.end, interval_proto.data)) - return tips - - @convert_grpc_exceptions - def get_newest_blocks(self, count: int) -> Tuple[List['Block'], bool]: - from hathor.transaction import tx_or_block_from_proto - self._check_connection() - request = protos.ListNewestRequest(tx_type=protos.BLOCK_TYPE, count=count) - result = self._stub.ListNewest(request) - tx_list: List['Block'] = [] - has_more = None - for list_item in result: - if list_item.HasField('transaction'): - tx_proto = list_item.transaction - blk = tx_or_block_from_proto(tx_proto, storage=self) - assert isinstance(blk, Block) - tx_list.append(blk) - elif list_item.HasField('has_more'): - has_more = list_item.has_more - # assuming there are no more items after `has_more`, break soon - break - else: - raise ValueError('unexpected list_item_oneof') - assert isinstance(has_more, bool) - return tx_list, has_more - - @convert_grpc_exceptions - def get_newest_txs(self, count: int) -> Tuple[List['BaseTransaction'], bool]: - from hathor.transaction import tx_or_block_from_proto - self._check_connection() - request = protos.ListNewestRequest(tx_type=protos.TRANSACTION_TYPE, count=count) - result = self._stub.ListNewest(request) - tx_list = [] - has_more = None - for list_item in result: - if list_item.HasField('transaction'): - tx_proto = list_item.transaction - tx_list.append(tx_or_block_from_proto(tx_proto, storage=self)) - elif list_item.HasField('has_more'): - has_more = list_item.has_more - # assuming there are no more items after `has_more`, break soon - break - else: - raise ValueError('unexpected list_item_oneof') - assert has_more is not None - return tx_list, has_more - - @convert_grpc_exceptions - def get_older_blocks_after(self, timestamp: int, hash_bytes: bytes, - count: int) -> Tuple[List['BaseTransaction'], bool]: - from hathor.transaction import tx_or_block_from_proto - self._check_connection() - if isinstance(timestamp, float): - self.log.warn('timestamp given in float will be truncated, use int instead') - timestamp = int(timestamp) - request = protos.ListRequest( - tx_type=protos.BLOCK_TYPE, - time_filter=protos.ONLY_OLDER, - timestamp=timestamp, - hash=hash_bytes, - max_count=count, - ) - result = self._stub.List(request) - tx_list = [] - has_more = None - for list_item in result: - if list_item.HasField('transaction'): - tx_proto = list_item.transaction - tx_list.append(tx_or_block_from_proto(tx_proto, storage=self)) - elif list_item.HasField('has_more'): - has_more = list_item.has_more - # assuming there are no more items after `has_more`, break soon - break - else: - raise ValueError('unexpected list_item_oneof') - assert has_more is not None - return tx_list, has_more - - @convert_grpc_exceptions - def get_newer_blocks_after(self, timestamp: int, hash_bytes: bytes, - count: int) -> Tuple[List['BaseTransaction'], bool]: - from hathor.transaction import tx_or_block_from_proto - self._check_connection() - if isinstance(timestamp, float): - self.log.warn('timestamp given in float will be truncated, use int instead') - timestamp = int(timestamp) - request = protos.ListRequest( - tx_type=protos.BLOCK_TYPE, - time_filter=protos.ONLY_NEWER, - timestamp=timestamp, - hash=hash_bytes, - max_count=count, - ) - result = self._stub.List(request) - tx_list = [] - has_more = None - for list_item in result: - if list_item.HasField('transaction'): - tx_proto = list_item.transaction - tx_list.append(tx_or_block_from_proto(tx_proto, storage=self)) - elif list_item.HasField('has_more'): - has_more = list_item.has_more - # assuming there are no more items after `has_more`, break soon - break - else: - raise ValueError('unexpected list_item_oneof') - assert has_more is not None - return tx_list, has_more - - @convert_grpc_exceptions - def get_older_txs_after(self, timestamp: int, hash_bytes: bytes, - count: int) -> Tuple[List['BaseTransaction'], bool]: - from hathor.transaction import tx_or_block_from_proto - self._check_connection() - if isinstance(timestamp, float): - self.log.warn('timestamp given in float will be truncated, use int instead') - timestamp = int(timestamp) - request = protos.ListRequest( - tx_type=protos.TRANSACTION_TYPE, - time_filter=protos.ONLY_OLDER, - timestamp=timestamp, - hash=hash_bytes, - max_count=count, - ) - result = self._stub.List(request) - tx_list = [] - has_more = None - for list_item in result: - if list_item.HasField('transaction'): - tx_proto = list_item.transaction - tx_list.append(tx_or_block_from_proto(tx_proto, storage=self)) - elif list_item.HasField('has_more'): - has_more = list_item.has_more - # assuming there are no more items after `has_more`, break soon - break - else: - raise ValueError('unexpected list_item_oneof') - assert has_more is not None - return tx_list, has_more - - @convert_grpc_exceptions - def get_newer_txs_after(self, timestamp: int, hash_bytes: bytes, - count: int) -> Tuple[List['BaseTransaction'], bool]: - from hathor.transaction import tx_or_block_from_proto - self._check_connection() - if isinstance(timestamp, float): - self.log.warn('timestamp given in float will be truncated, use int instead') - timestamp = int(timestamp) - request = protos.ListRequest( - tx_type=protos.TRANSACTION_TYPE, - time_filter=protos.ONLY_NEWER, - timestamp=timestamp, - hash=hash_bytes, - max_count=count, - ) - result = self._stub.List(request) - tx_list = [] - has_more = None - for list_item in result: - if list_item.HasField('transaction'): - tx_proto = list_item.transaction - tx_list.append(tx_or_block_from_proto(tx_proto, storage=self)) - elif list_item.HasField('has_more'): - has_more = list_item.has_more - # assuming there are no more items after `has_more`, break soon - break - else: - raise ValueError('unexpected list_item_oneof') - assert has_more is not None - return tx_list, has_more - - def _manually_initialize(self) -> None: - pass - - @convert_grpc_exceptions_generator - def _call_list_request_generators(self, kwargs: Optional[Dict[str, Any]] = None) -> Iterator['BaseTransaction']: - """ Execute a call for the ListRequest and yield the blocks or txs - - :param kwargs: Parameters to be sent to ListRequest - :type kwargs: Dict[str,] - """ - from hathor.transaction import tx_or_block_from_proto - - def get_tx(tx): - tx2 = self.get_transaction_from_weakref(tx.hash) - if tx2: - tx = tx2 - else: - self._save_to_weakref(tx) - return tx - - self._check_connection() - if kwargs: - request = protos.ListRequest(**kwargs) - else: - request = protos.ListRequest() - - result = self._stub.List(request) - for list_item in result: - if not list_item.HasField('transaction'): - break - tx_proto = list_item.transaction - tx = tx_or_block_from_proto(tx_proto, storage=self) - assert tx.hash is not None - lock = self._get_lock(tx.hash) - - if lock: - with lock: - tx = get_tx(tx) - else: - tx = get_tx(tx) - yield tx - - @convert_grpc_exceptions_generator - def _topological_sort(self): - yield from self._call_list_request_generators({'order_by': protos.TOPOLOGICAL_ORDER}) - - @convert_grpc_exceptions - def _add_to_cache(self, tx): - self._check_connection() - tx_proto = tx.to_proto() - request = protos.MarkAsRequest(transaction=tx_proto, mark_type=protos.FOR_CACHING, relax_assert=False) - result = self._stub.MarkAs(request) # noqa: F841 - - @convert_grpc_exceptions - def _del_from_cache(self, tx: 'BaseTransaction', *, relax_assert: bool = False) -> None: - self._check_connection() - tx_proto = tx.to_proto() - request = protos.MarkAsRequest(transaction=tx_proto, mark_type=protos.FOR_CACHING, remove_mark=True, - relax_assert=relax_assert) - result = self._stub.MarkAs(request) # noqa: F841 - - @convert_grpc_exceptions - def get_block_count(self) -> int: - self._check_connection() - request = protos.CountRequest(tx_type=protos.BLOCK_TYPE) - result = self._stub.Count(request) - return result.count - - @convert_grpc_exceptions - def get_tx_count(self) -> int: - self._check_connection() - request = protos.CountRequest(tx_type=protos.TRANSACTION_TYPE) - result = self._stub.Count(request) - return result.count - - def get_genesis(self, hash_bytes: bytes) -> Optional['BaseTransaction']: - assert self._genesis_cache is not None - return self._genesis_cache.get(hash_bytes, None) - - def get_all_genesis(self) -> Set['BaseTransaction']: - assert self._genesis_cache is not None - return set(self._genesis_cache.values()) - - @convert_grpc_exceptions - def get_transactions_before(self, hash_bytes, num_blocks=100): # pragma: no cover - from hathor.transaction import tx_or_block_from_proto - self._check_connection() - request = protos.ListRequest( - tx_type=protos.TRANSACTION_TYPE, - hash=hash_bytes, - max_count=num_blocks, - filter_before=True, - ) - result = self._stub.List(request) - tx_list = [] - for list_item in result: - if not list_item.HasField('transaction'): - break - tx_proto = list_item.transaction - tx_list.append(tx_or_block_from_proto(tx_proto, storage=self)) - return tx_list - - @convert_grpc_exceptions - def get_blocks_before(self, hash_bytes: bytes, num_blocks: int = 100) -> List[Block]: - from hathor.transaction import tx_or_block_from_proto - self._check_connection() - request = protos.ListRequest( - tx_type=protos.BLOCK_TYPE, - hash=hash_bytes, - max_count=num_blocks, - filter_before=True, - ) - result = self._stub.List(request) - tx_list: List[Block] = [] - for list_item in result: - if not list_item.HasField('transaction'): - break - tx_proto = list_item.transaction - block = tx_or_block_from_proto(tx_proto, storage=self) - assert isinstance(block, Block) - tx_list.append(block) - return tx_list - - @convert_grpc_exceptions - def get_all_sorted_txs(self, timestamp: int, count: int, offset: int) -> TransactionsIndex: - self._check_connection() - request = protos.SortedTxsRequest( - timestamp=timestamp, - count=count, - offset=offset - ) - result = self._stub.SortedTxs(request) - tx_list = [] - for tx_proto in result: - tx_list.append(TransactionIndexElement(tx_proto.timestamp, tx_proto.hash)) - - all_sorted = TransactionsIndex() - all_sorted.update(tx_list) - return all_sorted - - @convert_grpc_exceptions - def add_value(self, key: str, value: str) -> None: - self._check_connection() - request = protos.AddValueRequest( - key=key, - value=value - ) - result = self._stub.AddValue(request) # noqa: F841 - - @convert_grpc_exceptions - def remove_value(self, key: str) -> None: - self._check_connection() - request = protos.RemoveValueRequest( - key=key, - ) - result = self._stub.RemoveValue(request) # noqa: F841 - - @convert_grpc_exceptions - def get_value(self, key: str) -> Optional[str]: - self._check_connection() - request = protos.GetValueRequest( - key=key - ) - result = self._stub.GetValue(request) - if not result.value: - return None - - return result.value - - -class TransactionStorageServicer(protos.TransactionStorageServicer): - - def __init__(self, tx_storage): - self.log = logger.new() - self.storage = tx_storage - # We must always disable weakref because it will run remotely, which means - # each call will create a new instance of the block/transaction during the - # deserialization process. - self.storage._disable_weakref() - - @convert_hathor_exceptions - def Exists(self, request: protos.ExistsRequest, context: _Context) -> protos.ExistsResponse: - hash_bytes = request.hash - exists = self.storage.transaction_exists(hash_bytes) - return protos.ExistsResponse(exists=exists) - - @convert_hathor_exceptions - def Get(self, request: protos.GetRequest, context: _Context) -> protos.GetResponse: - hash_bytes = request.hash - exclude_metadata = request.exclude_metadata - - tx = self.storage.get_transaction(hash_bytes) - - if exclude_metadata: - del tx._metadata - else: - tx.get_metadata() - - return protos.GetResponse(transaction=tx.to_proto()) - - @convert_hathor_exceptions - def Save(self, request: protos.SaveRequest, context: _Context) -> protos.SaveResponse: - from hathor.transaction import tx_or_block_from_proto - - tx_proto = request.transaction - only_metadata = request.only_metadata - - result = protos.SaveResponse(saved=False) - - tx = tx_or_block_from_proto(tx_proto, storage=self.storage) - self.storage.save_transaction(tx, only_metadata=only_metadata) - result.saved = True - - return result - - @convert_hathor_exceptions - def Remove(self, request: protos.RemoveRequest, context: _Context) -> protos.RemoveResponse: - from hathor.transaction import tx_or_block_from_proto - - tx_proto = request.transaction - - result = protos.RemoveResponse(removed=False) - - tx = tx_or_block_from_proto(tx_proto, storage=self.storage) - self.storage.remove_transaction(tx) - result.removed = True - - return result - - @convert_hathor_exceptions - def Count(self, request: protos.CountRequest, context: _Context) -> protos.CountResponse: - tx_type = request.tx_type - if tx_type == protos.ANY_TYPE: - count = self.storage.get_count_tx_blocks() - elif tx_type == protos.TRANSACTION_TYPE: - count = self.storage.get_tx_count() - elif tx_type == protos.BLOCK_TYPE: - count = self.storage.get_block_count() - else: - raise ValueError('invalid tx_type %s' % (tx_type,)) - return protos.CountResponse(count=count) - - @convert_hathor_exceptions - def LatestTimestamp(self, request: protos.LatestTimestampRequest, - context: _Context) -> protos.LatestTimestampResponse: - return protos.LatestTimestampResponse(timestamp=self.storage.latest_timestamp) - - @convert_hathor_exceptions - def FirstTimestamp(self, request: protos.FirstTimestampRequest, - context: _Context) -> protos.FirstTimestampResponse: - return protos.FirstTimestampResponse(timestamp=self.storage.first_timestamp) - - @convert_hathor_exceptions - def MarkAs(self, request, context): - from hathor.transaction import tx_or_block_from_proto - - tx = tx_or_block_from_proto(request.transaction, storage=self.storage) - - if request.mark_type == protos.FOR_CACHING: - if request.remove_mark: - self.storage._del_from_cache(tx, relax_assert=request.relax_assert) - else: - self.storage._add_to_cache(tx) - else: - raise ValueError('invalid mark_type') - - # TODO: correct value for `marked` - return protos.MarkAsResponse(marked=True) - - @convert_hathor_exceptions_generator - def List(self, request: protos.ListRequest, context: _Context) -> Iterator[protos.ListItemResponse]: - exclude_metadata = request.exclude_metadata - has_more = None - - hash_bytes = request.hash - count = request.max_count - timestamp = request.timestamp - - # TODO: more exceptions for unsupported cases - if request.filter_before: - if request.tx_type == protos.ANY_TYPE: - raise NotImplementedError - elif request.tx_type == protos.TRANSACTION_TYPE: - tx_iter = self.storage.get_transactions_before(hash_bytes, count) - elif request.tx_type == protos.BLOCK_TYPE: - tx_iter = self.storage.get_blocks_before(hash_bytes, count) - else: - raise ValueError('invalid tx_type %s' % (request.tx_type,)) - elif request.time_filter == protos.ONLY_NEWER: - if request.tx_type == protos.ANY_TYPE: - raise NotImplementedError - elif request.tx_type == protos.TRANSACTION_TYPE: - tx_iter, has_more = self.storage.get_newer_txs_after(timestamp, hash_bytes, count) - elif request.tx_type == protos.BLOCK_TYPE: - tx_iter, has_more = self.storage.get_newer_blocks_after(timestamp, hash_bytes, count) - else: - raise ValueError('invalid tx_type %s' % (request.tx_type,)) - elif request.time_filter == protos.ONLY_OLDER: - if request.tx_type == protos.ANY_TYPE: - raise NotImplementedError - elif request.tx_type == protos.TRANSACTION_TYPE: - tx_iter, has_more = self.storage.get_older_txs_after(timestamp, hash_bytes, count) - elif request.tx_type == protos.BLOCK_TYPE: - tx_iter, has_more = self.storage.get_older_blocks_after(timestamp, hash_bytes, count) - else: - raise ValueError('invalid tx_type %s' % (request.tx_type,)) - elif request.time_filter == protos.NO_FILTER: - if request.order_by == protos.ANY_ORDER: - tx_iter = self.storage.get_all_transactions() - elif request.order_by == protos.TOPOLOGICAL_ORDER: - tx_iter = self.storage._topological_sort() - else: - raise ValueError('invalid order_by') - else: - raise ValueError('invalid request') - - for tx in tx_iter: - if exclude_metadata: - del tx._metadata - else: - tx.get_metadata() - yield protos.ListItemResponse(transaction=tx.to_proto()) - if has_more is not None: - yield protos.ListItemResponse(has_more=has_more) - - @convert_hathor_exceptions_generator - def ListTips(self, request: protos.ListTipsRequest, context: _Context) -> Iterator[protos.Interval]: - # XXX: using HasField (and oneof) to differentiate None from 0, which is very important in this context - timestamp = None - if request.HasField('timestamp'): - timestamp = request.timestamp - - if request.tx_type == protos.ANY_TYPE: - tx_intervals = self.storage.get_all_tips(timestamp) - elif request.tx_type == protos.TRANSACTION_TYPE: - tx_intervals = self.storage.get_tx_tips(timestamp) - elif request.tx_type == protos.BLOCK_TYPE: - tx_intervals = self.storage.get_block_tips(timestamp) - else: - raise ValueError('invalid tx_type %s' % (request.tx_type,)) - - for interval in tx_intervals: - yield protos.Interval(begin=interval.begin, end=interval.end, data=interval.data) - - @convert_hathor_exceptions_generator - def ListNewest(self, request: protos.ListNewestRequest, context: _Context) -> Iterator[protos.ListItemResponse]: - has_more = False - if request.tx_type == protos.ANY_TYPE: - raise NotImplementedError - elif request.tx_type == protos.TRANSACTION_TYPE: - tx_list, has_more = self.storage.get_newest_txs(request.count) - elif request.tx_type == protos.BLOCK_TYPE: - tx_list, has_more = self.storage.get_newest_blocks(request.count) - else: - raise ValueError('invalid tx_type %s' % (request.tx_type,)) - - for tx in tx_list: - yield protos.ListItemResponse(transaction=tx.to_proto()) - yield protos.ListItemResponse(has_more=has_more) - - @convert_hathor_exceptions_generator - def SortedTxs(self, request: protos.SortedTxsRequest, context: _Context) -> Iterator[protos.Transaction]: - timestamp = request.timestamp - offset = request.offset - count = request.count - - txs_index = self.storage.get_all_sorted_txs(timestamp, count, offset) - for tx_element in txs_index[:]: - yield protos.Transaction(timestamp=tx_element.timestamp, hash=tx_element.hash) - - @convert_hathor_exceptions - def AddValue(self, request: protos.AddValueRequest, context: _Context) -> protos.Empty: - key = request.key - value = request.value - - self.storage.add_value(key, value) - return protos.Empty() - - @convert_hathor_exceptions - def RemoveValue(self, request: protos.RemoveValueRequest, context: _Context) -> protos.Empty: - key = request.key - self.storage.remove_value(key) - return protos.Empty() - - @convert_hathor_exceptions - def GetValue(self, request: protos.GetValueRequest, context: _Context) -> protos.GetValueResponse: - key = request.key - value = self.storage.get_value(key) - - if value: - return protos.GetValueResponse(value=value) - else: - return protos.GetValueResponse() - - -def create_transaction_storage_server(server: grpc.Server, tx_storage: TransactionStorage, - port: Optional[int] = None) -> Tuple[protos.TransactionStorageServicer, int]: - """Create a GRPC servicer for the given storage, returns a (servicer, port) tuple. - - :param server: a GRPC server - :type server: :py:class:`grpc.Server` - - :param tx_storage: an instance of TransactionStorage - :type tx_storage: :py:class:`hathor.transaction.storage.TransactionStorage` - - :param port: optional listen port, if None a random port will be chosen (and returned) - :type server: :py:class:`typing.Optional[int]` - - :rtype :py:class:`typing.Tuple[hathor.protos.TransactionStorageServicer, int]` - """ - servicer = TransactionStorageServicer(tx_storage) - protos.add_TransactionStorageServicer_to_server(servicer, server) - port = server.add_insecure_port('127.0.0.1:0') - assert port is not None - return servicer, port diff --git a/hathor/transaction/storage/rocksdb_storage.py b/hathor/transaction/storage/rocksdb_storage.py index 37a8d664f..aacb0a677 100644 --- a/hathor/transaction/storage/rocksdb_storage.py +++ b/hathor/transaction/storage/rocksdb_storage.py @@ -16,13 +16,13 @@ from typing import TYPE_CHECKING, Iterator, Optional from hathor.transaction.storage.exceptions import TransactionDoesNotExist -from hathor.transaction.storage.transaction_storage import BaseTransactionStorage, TransactionStorageAsyncFromSync +from hathor.transaction.storage.transaction_storage import BaseTransactionStorage if TYPE_CHECKING: from hathor.transaction import BaseTransaction -class TransactionRocksDBStorage(BaseTransactionStorage, TransactionStorageAsyncFromSync): +class TransactionRocksDBStorage(BaseTransactionStorage): """This storage saves tx and metadata to the same key on RocksDB It uses Protobuf serialization internally. diff --git a/hathor/transaction/storage/subprocess_storage.py b/hathor/transaction/storage/subprocess_storage.py deleted file mode 100644 index c4e9bd244..000000000 --- a/hathor/transaction/storage/subprocess_storage.py +++ /dev/null @@ -1,78 +0,0 @@ -# Copyright 2021 Hathor Labs -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from concurrent import futures -from multiprocessing import Process, Queue - -import grpc - -from hathor.exception import HathorError -from hathor.transaction.storage.remote_storage import TransactionRemoteStorage, create_transaction_storage_server - - -class SubprocessNotAliveError(HathorError): - pass - - -class TransactionSubprocessStorage(TransactionRemoteStorage, Process): - """Subprocess storage to be used 'on top' of other storages. - - Wraps a given store constructor and spawns it on a subprocess. - """ - - def __init__(self, store_constructor, with_index=None): - """ - :param store_constructor: a callable that returns an instance of TransactionStorage - :type store_constructor: :py:class:`typing.Callable[..., hathor.transaction.storage.TransactionStorage]` - """ - Process.__init__(self) - TransactionRemoteStorage.__init__(self, with_index=with_index) - self._store_constructor = store_constructor - # this queue is used by the subprocess to inform which port was selected - self._port_q: 'Queue[int]' = Queue(1) - # this queue is used to inform the subprocess it can end - self._exit_q: 'Queue[int]' = Queue(1) - - def _check_connection(self): - """raise error if subprocess is not alive""" - super()._check_connection() - if not self.is_alive(): - raise SubprocessNotAliveError('subprocess is dead') - - def stop(self): - self._exit_q.put(None) - if self._channel: - self._channel.close() - - def start(self): - super().start() - port = self._port_q.get() - self.connect_to(port) - - def terminate(self): - self.close() - super().terminate() - - def run(self): - """internal method for Process interface, do not run directly""" - # TODO: some tuning with benchmarks - server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) - tx_storage = self._store_constructor() - tx_storage._manually_initialize() - _servicer, port = create_transaction_storage_server(server, tx_storage) - self._port_q.put(port) - server.start() - self._exit_q.get() - # the above all blocks until _exit_q.put(None) or _exit_q closes - server.stop(0) diff --git a/hathor/transaction/storage/transaction_storage.py b/hathor/transaction/storage/transaction_storage.py index d20cd43e8..03449f49d 100644 --- a/hathor/transaction/storage/transaction_storage.py +++ b/hathor/transaction/storage/transaction_storage.py @@ -16,12 +16,11 @@ from abc import ABC, abstractmethod, abstractproperty from collections import deque from threading import Lock -from typing import Any, Dict, Generator, Iterator, List, NamedTuple, Optional, Set, Tuple, cast +from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Set, Tuple, cast from weakref import WeakValueDictionary from intervaltree.interval import Interval from structlog import get_logger -from twisted.internet.defer import Deferred, inlineCallbacks, succeed from hathor.conf import HathorSettings from hathor.indexes import IndexesManager, TokensIndex, TransactionsIndex, WalletIndex @@ -30,7 +29,6 @@ from hathor.transaction.storage.exceptions import TransactionDoesNotExist, TransactionIsNotABlock from hathor.transaction.transaction import BaseTransaction from hathor.transaction.transaction_metadata import TransactionMetadata -from hathor.util import skip_warning settings = HathorSettings() @@ -271,92 +269,6 @@ def get_count_tx_blocks(self) -> int: """ raise NotImplementedError - """Async interface, all methods mirrorred from TransactionStorageSync, but suffixed with `_deferred`.""" - - @abstractmethod - def save_transaction_deferred(self, tx: BaseTransaction, *, only_metadata: bool = False) -> None: - """Saves the tx. - - :param tx: Transaction to save - :type tx: :py:class:`hathor.transaction.BaseTransaction` - - :param only_metadata: Don't save the transaction, only the metadata of this transaction - :type only_metadata: bool - - :rtype :py:class:`twisted.internet.defer.Deferred[None]` - """ - if self.with_index: - self._add_to_cache(tx) - return succeed(None) - - @abstractmethod - def remove_transaction_deferred(self, tx: BaseTransaction) -> None: - """Remove the tx. - - :param tx: Transaction to be removed - - :rtype :py:class:`twisted.internet.defer.Deferred[None]` - """ - if self.with_index: - self._del_from_cache(tx) - return succeed(None) - - @abstractmethod - def transaction_exists_deferred(self, hash_bytes: bytes) -> bool: - """Returns `True` if transaction with hash `hash_bytes` exists. - - :param hash_bytes: Hash in bytes that will be checked. - :type hash_bytes: bytes - - :rtype :py:class:`twisted.internet.defer.Deferred[bool]` - """ - raise NotImplementedError - - @abstractmethod - def get_transaction_deferred(self, hash_bytes: bytes) -> BaseTransaction: - """Returns the transaction with hash `hash_bytes`. - - :param hash_bytes: Hash in bytes that will be checked. - :type hash_bytes: bytes - - :rtype :py:class:`twisted.internet.defer.Deferred[hathor.transaction.BaseTransaction]` - """ - raise NotImplementedError - - @inlineCallbacks - def get_metadata_deferred(self, hash_bytes: bytes) -> Generator[Any, Any, Optional[TransactionMetadata]]: - """Returns the transaction metadata with hash `hash_bytes`. - - :param hash_bytes: Hash in bytes that will be checked. - :type hash_bytes: bytes - - :rtype :py:class:`twisted.internet.defer.Deferred[hathor.transaction.TransactionMetadata]` - """ - try: - tx = yield self.get_transaction_deferred(hash_bytes) - return tx.get_metadata(use_storage=False) - except TransactionDoesNotExist: - return None - - @abstractmethod - def get_all_transactions_deferred(self) -> Iterator[BaseTransaction]: - # TODO: find an `async generator` type - # TODO: verify the following claim: - """Return all transactions that are not blocks. - - :rtype :py:class:`twisted.internet.defer.Deferred[typing.Iterable[hathor.transaction.BaseTransaction]]` - """ - raise NotImplementedError - - @abstractmethod - def get_count_tx_blocks_deferred(self) -> int: - # TODO: verify the following claim: - """Return the number of transactions/blocks stored. - - :rtype :py:class:`twisted.internet.defer.Deferred[int]` - """ - raise NotImplementedError - @abstractproperty def latest_timestamp(self) -> int: raise NotImplementedError @@ -644,28 +556,6 @@ def is_db_clean(self) -> bool: return self.get_value(self._clean_db_attribute) == '1' -class TransactionStorageAsyncFromSync(TransactionStorage): - """Implement async interface from sync interface, for legacy implementations.""" - - def save_transaction_deferred(self, tx: BaseTransaction, *, only_metadata: bool = False) -> Deferred: - return succeed(skip_warning(self.save_transaction)(tx, only_metadata=only_metadata)) - - def remove_transaction_deferred(self, tx: BaseTransaction) -> Deferred: - return succeed(skip_warning(self.remove_transaction)(tx)) - - def transaction_exists_deferred(self, hash_bytes: bytes) -> Deferred: - return succeed(skip_warning(self.transaction_exists)(hash_bytes)) - - def get_transaction_deferred(self, hash_bytes: bytes) -> Deferred: - return succeed(skip_warning(self.get_transaction)(hash_bytes)) - - def get_all_transactions_deferred(self) -> Deferred: - return succeed(skip_warning(self.get_all_transactions)()) - - def get_count_tx_blocks_deferred(self) -> Deferred: - return succeed(skip_warning(self.get_count_tx_blocks)()) - - class BaseTransactionStorage(TransactionStorage): def __init__(self, with_index: bool = True, pubsub: Optional[Any] = None) -> None: super().__init__() diff --git a/tests/p2p/test_double_spending.py b/tests/p2p/test_double_spending.py index 4238b8b75..fa073bd43 100644 --- a/tests/p2p/test_double_spending.py +++ b/tests/p2p/test_double_spending.py @@ -2,7 +2,7 @@ from hathor.crypto.util import decode_address from tests import unittest -from tests.utils import add_blocks_unlock_reward, add_new_blocks, add_new_tx, start_remote_storage +from tests.utils import add_blocks_unlock_reward, add_new_blocks, add_new_tx class HathorSyncMethodsTestCase(unittest.TestCase): @@ -311,15 +311,3 @@ def test_double_spending_propagation(self): # dot2 = self.manager1.tx_storage.graphviz_funds(format='pdf', acc_weight=True) # dot2.render('dot2') - - -class RemoteStorageSyncTest(HathorSyncMethodsTestCase): - def setUp(self): - super().setUp() - tx_storage, self._server = start_remote_storage() - - self.manager1.tx_storage = tx_storage - - def tearDown(self): - self._server.stop(0).wait() - super().tearDown() diff --git a/tests/p2p/test_sync.py b/tests/p2p/test_sync.py index 5ecda8a5e..1c9edd552 100644 --- a/tests/p2p/test_sync.py +++ b/tests/p2p/test_sync.py @@ -5,9 +5,7 @@ from hathor.p2p.protocol import PeerIdState from hathor.simulator import FakeConnection from hathor.transaction.storage.exceptions import TransactionIsNotABlock -from hathor.transaction.storage.remote_storage import RemoteCommunicationError, TransactionRemoteStorage from tests import unittest -from tests.utils import start_remote_storage class HathorSyncMethodsTestCase(unittest.TestCase): @@ -72,12 +70,8 @@ def test_get_blocks_before(self): self.assertEqual(0, len(result)) genesis_tx = [tx for tx in self.genesis if not tx.is_block][0] - if isinstance(self.manager1.tx_storage, TransactionRemoteStorage): - with self.assertRaises(RemoteCommunicationError): - self.manager1.tx_storage.get_blocks_before(genesis_tx.hash) - else: - with self.assertRaises(TransactionIsNotABlock): - self.manager1.tx_storage.get_blocks_before(genesis_tx.hash) + with self.assertRaises(TransactionIsNotABlock): + self.manager1.tx_storage.get_blocks_before(genesis_tx.hash) blocks = self._add_new_blocks(20) num_blocks = 5 @@ -301,15 +295,3 @@ def test_downloader(self): # And try again downloader.check_downloading_queue() self.assertEqual(len(downloader.downloading_deque), 0) - - -class RemoteStorageSyncTest(HathorSyncMethodsTestCase): - def setUp(self): - super().setUp() - tx_storage, self._server = start_remote_storage() - - self.manager1.tx_storage = tx_storage - - def tearDown(self): - self._server.stop(0).wait() - super().tearDown() diff --git a/tests/resources/transaction/test_create_tx.py b/tests/resources/transaction/test_create_tx.py index df34b0074..f660ad0c7 100644 --- a/tests/resources/transaction/test_create_tx.py +++ b/tests/resources/transaction/test_create_tx.py @@ -7,7 +7,7 @@ from hathor.transaction.resources import CreateTxResource from hathor.transaction.scripts import P2PKH, create_base_script from tests.resources.base_resource import StubSite, _BaseResourceTest -from tests.utils import add_blocks_unlock_reward, add_new_blocks, add_new_tx, start_remote_storage +from tests.utils import add_blocks_unlock_reward, add_new_blocks, add_new_tx class TransactionTest(_BaseResourceTest._ResourceTest): @@ -357,14 +357,3 @@ def test_invalid_address(self): }) # TODO: tests that use the tokens field (i.e. not only HTR) - - -class RemoteStorageTransactionTest(TransactionTest): - def setUp(self): - self.tx_storage, self._server = start_remote_storage() - self.tx_storage.with_index = True - super().setUp() - - def tearDown(self): - super().tearDown() - self._server.stop(0).wait() diff --git a/tests/resources/transaction/test_tx.py b/tests/resources/transaction/test_tx.py index 6af3e1f8f..b7138ec85 100644 --- a/tests/resources/transaction/test_tx.py +++ b/tests/resources/transaction/test_tx.py @@ -3,7 +3,7 @@ from hathor.transaction import Transaction from hathor.transaction.resources import TransactionResource from tests.resources.base_resource import StubSite, _BaseResourceTest -from tests.utils import add_blocks_unlock_reward, add_new_blocks, add_new_transactions, start_remote_storage +from tests.utils import add_blocks_unlock_reward, add_new_blocks, add_new_transactions class TransactionTest(_BaseResourceTest._ResourceTest): @@ -231,14 +231,3 @@ def test_invalid_params(self): }) data = response.json_value() self.assertFalse(data['success']) - - -class RemoteStorageTransactionTest(TransactionTest): - def setUp(self): - self.tx_storage, self._server = start_remote_storage() - self.tx_storage.with_index = True - super().setUp() - - def tearDown(self): - super().tearDown() - self._server.stop(0).wait() diff --git a/tests/tx/test_cache_storage.py b/tests/tx/test_cache_storage.py index 9edcb847c..423bab0ec 100644 --- a/tests/tx/test_cache_storage.py +++ b/tests/tx/test_cache_storage.py @@ -1,13 +1,8 @@ -import collections - -from twisted.internet.defer import inlineCallbacks - from hathor.daa import TestMode, _set_test_mode -from hathor.transaction import Block, Transaction, TransactionMetadata, TxOutput -from hathor.transaction.scripts import P2PKH +from hathor.transaction import Transaction, TransactionMetadata from hathor.transaction.storage import TransactionCacheStorage, TransactionMemoryStorage from tests import unittest -from tests.utils import BURN_ADDRESS, MIN_TIMESTAMP, add_new_blocks, add_new_transactions +from tests.utils import add_new_blocks, add_new_transactions CACHE_SIZE = 5 @@ -26,7 +21,7 @@ def setUp(self): self.genesis_txs = [tx for tx in self.genesis if not tx.is_block] # Save genesis metadata - self.cache_storage.save_transaction_deferred(self.genesis_txs[0], only_metadata=True) + self.cache_storage.save_transaction(self.genesis_txs[0], only_metadata=True) self.manager = self.create_peer('testnet', tx_storage=self.cache_storage, unlock_wallet=True) @@ -143,58 +138,6 @@ def test_flush_thread(self): del self.cache_storage.cache[next(iter(self.cache_storage.dirty_txs))] self.cache_storage._flush_to_storage(self.cache_storage.dirty_txs.copy()) - def test_deferred_methods(self): - for _ in self._test_deferred_methods(): - pass - - @inlineCallbacks - def _test_deferred_methods(self): - # Testing without cloning - self.cache_storage._clone_if_needed = False - - block_parents = [block.hash for block in self.genesis_blocks] + [tx.hash for tx in self.genesis_txs] - output = TxOutput(200, P2PKH.create_output_script(BURN_ADDRESS)) - obj = Block(timestamp=MIN_TIMESTAMP, weight=12, outputs=[output], parents=block_parents, nonce=100781, - storage=self.cache_storage) - obj.resolve() - obj.verify() - - self.cache_storage.save_transaction_deferred(obj) - - loaded_obj1 = yield self.cache_storage.get_transaction_deferred(obj.hash) - - metadata_obj1_def = yield self.cache_storage.get_metadata_deferred(obj.hash) - metadata_obj1 = obj.get_metadata() - self.assertEqual(metadata_obj1_def, metadata_obj1) - metadata_error = yield self.cache_storage.get_metadata_deferred( - bytes.fromhex('0001569c85fffa5782c3979e7d68dce1d8d84772505a53ddd76d636585f3977e')) - self.assertIsNone(metadata_error) - - self.cache_storage._flush_to_storage(self.cache_storage.dirty_txs.copy()) - self.cache_storage.cache = collections.OrderedDict() - loaded_obj2 = yield self.cache_storage.get_transaction_deferred(obj.hash) - - self.assertEqual(loaded_obj1, loaded_obj2) - - self.assertTrue((yield self.cache_storage.transaction_exists_deferred(obj.hash))) - self.assertFalse((yield self.cache_storage.transaction_exists_deferred( - '0001569c85fffa5782c3979e7d68dce1d8d84772505a53ddd76d636585f3977e'))) - - self.assertFalse( - self.cache_storage.transaction_exists('0001569c85fffa5782c3979e7d68dce1d8d84772505a53ddd76d636585f3977e')) - - self.assertEqual(obj, loaded_obj1) - self.assertEqual(obj.is_block, loaded_obj1.is_block) - - count = yield self.cache_storage.get_count_tx_blocks_deferred() - self.assertEqual(count, 4) - - all_transactions = yield self.cache_storage.get_all_transactions_deferred() - total = 0 - for tx in all_transactions: - total += 1 - self.assertEqual(total, 4) - def test_topological_sort_dfs(self): _set_test_mode(TestMode.TEST_ALL_WEIGHT) add_new_blocks(self.manager, 11, advance_clock=1) diff --git a/tests/tx/test_remote_storage.py b/tests/tx/test_remote_storage.py deleted file mode 100644 index 1639eb855..000000000 --- a/tests/tx/test_remote_storage.py +++ /dev/null @@ -1,72 +0,0 @@ -import datetime - -from hathor.transaction import Block, Transaction -from hathor.transaction.base_transaction import tx_or_block_from_proto -from hathor.transaction.storage.remote_storage import RemoteCommunicationError -from tests import unittest -from tests.utils import add_new_blocks, add_new_transactions, start_remote_storage - - -class RemoteStorageTest(unittest.TestCase): - def setUp(self): - super().setUp() - tx_storage, self._server = start_remote_storage() - - self.network = 'testnet' - self.manager = self.create_peer(self.network, unlock_wallet=True) - self.manager.tx_storage = tx_storage - - self.genesis = tx_storage.get_all_genesis() - self.genesis_blocks = [tx for tx in self.genesis if tx.is_block] - self.genesis_txs = [tx for tx in self.genesis if not tx.is_block] - - def tearDown(self): - self._server.stop(0).wait() - super().tearDown() - - def test_exceptions(self): - self._server.stop(0).wait() - - with self.assertRaises(RemoteCommunicationError): - self.manager.tx_storage.get_block_count() - - def test_get_txs(self): - first_block = add_new_blocks(self.manager, 30, advance_clock=1)[0] - first_tx = add_new_transactions(self.manager, 3, advance_clock=1)[0] - - # Using timestamp as float to test code - txs, _ = self.manager.tx_storage.get_older_txs_after(float(first_tx.timestamp), first_tx.hash, 3) - self.assertEqual(len(txs), 2) - - txs, _ = self.manager.tx_storage.get_newer_txs_after(float(first_tx.timestamp), first_tx.hash, 3) - self.assertEqual(len(txs), 2) - - blocks, _ = self.manager.tx_storage.get_older_blocks_after(float(first_block.timestamp), first_block.hash, 3) - self.assertEqual(len(blocks), 1) - - blocks, _ = self.manager.tx_storage.get_newer_blocks_after(float(first_block.timestamp), first_block.hash, 3) - self.assertEqual(len(blocks), 3) - - tx = txs[0] - proto = tx.to_proto() - tx2 = Transaction.create_from_proto(proto) - self.assertEqual(tx, tx2) - - block = blocks[0] - proto2 = block.to_proto() - block2 = Block.create_from_proto(proto2) - self.assertEqual(block, block2) - - tx3 = tx_or_block_from_proto(proto) - self.assertEqual(tx, tx3) - - proto.ClearField('transaction') - - with self.assertRaises(ValueError): - tx_or_block_from_proto(proto) - - t = datetime.datetime.now() - datetime.timedelta(seconds=1) - t_tx = tx.get_time_from_now() - t2_tx = tx.get_time_from_now(now=t) - - self.assertNotEqual(t_tx, t2_tx) diff --git a/tests/tx/test_tx_serialization.py b/tests/tx/test_tx_serialization.py index 30a30506d..5e7077f03 100644 --- a/tests/tx/test_tx_serialization.py +++ b/tests/tx/test_tx_serialization.py @@ -1,4 +1,3 @@ -import hathor.protos.transaction_pb2_grpc # noqa this file has nothing to test, only import from hathor.crypto.util import decode_address from hathor.transaction import Transaction from hathor.wallet.base_wallet import WalletOutputInfo diff --git a/tests/tx/test_tx_storage.py b/tests/tx/test_tx_storage.py index f1ef9e5eb..8a37c3a09 100644 --- a/tests/tx/test_tx_storage.py +++ b/tests/tx/test_tx_storage.py @@ -21,7 +21,6 @@ TransactionCompactStorage, TransactionMemoryStorage, TransactionRocksDBStorage, - TransactionSubprocessStorage, ) from hathor.transaction.storage.exceptions import TransactionDoesNotExist from hathor.wallet import Wallet @@ -32,7 +31,6 @@ add_new_blocks, add_new_transactions, create_tokens, - start_remote_storage, ) try: @@ -406,25 +404,6 @@ def test_key_value_attribute(self): # Key should not exist again self.assertIsNone(self.tx_storage.get_value(attr)) - class _RemoteStorageTest(_TransactionStorageTest): - def setUp(self, tx_storage, reactor=None): - tx_storage, self._server = start_remote_storage(tx_storage=tx_storage) - super().setUp(tx_storage, reactor=reactor) - - def tearDown(self): - self._server.stop(0) - super().tearDown() - - class _SubprocessStorageTest(_TransactionStorageTest): - def setUp(self, tx_storage_constructor, reactor=None): - tx_storage = TransactionSubprocessStorage(tx_storage_constructor) - tx_storage.start() - super().setUp(tx_storage, reactor=reactor) - - def tearDown(self): - self.tx_storage.stop() - super().tearDown() - class TransactionBinaryStorageTest(_BaseTransactionStorageTest._TransactionStorageTest): def setUp(self): @@ -492,31 +471,6 @@ def setUp(self): super().setUp(TransactionCacheStorage(store, reactor, capacity=5)) -# class SubprocessMemoryStorageTest(_BaseTransactionStorageTest._SubprocessStorageTest): -# def setUp(self): -# super().setUp(TransactionMemoryStorage) - -# class SubprocessCacheMemoryStorageTest(_BaseTransactionStorageTest._SubprocessStorageTest): -# def setUp(self): -# def storage_constructor(): -# store = TransactionMemoryStorage() -# reactor = Clock() -# return TransactionCacheStorage(store, reactor, capacity=5) -# super().setUp(storage_constructor) - - -class RemoteMemoryStorageTest(_BaseTransactionStorageTest._RemoteStorageTest): - def setUp(self): - super().setUp(TransactionMemoryStorage()) - - -class RemoteCacheMemoryStorageTest(_BaseTransactionStorageTest._RemoteStorageTest): - def setUp(self): - store = TransactionMemoryStorage() - reactor = Clock() - super().setUp(TransactionCacheStorage(store, reactor, capacity=5)) - - @pytest.mark.skipif(not HAS_ROCKSDB, reason='requires python-rocksdb') class TransactionRocksDBStorageTest(_BaseTransactionStorageTest._TransactionStorageTest): def setUp(self): diff --git a/tests/utils.py b/tests/utils.py index 5fd91a8b6..fa299adcd 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -3,10 +3,8 @@ import subprocess import time import urllib.parse -from concurrent import futures from typing import List, Optional, cast -import grpc import requests from hathor.conf import HathorSettings @@ -14,11 +12,6 @@ from hathor.manager import HathorManager from hathor.transaction import Transaction, TxInput, TxOutput, genesis from hathor.transaction.scripts import P2PKH, HathorScript, Opcode -from hathor.transaction.storage import ( - TransactionMemoryStorage, - TransactionRemoteStorage, - create_transaction_storage_server, -) from hathor.transaction.token_creation_tx import TokenCreationTransaction from hathor.transaction.util import get_deposit_amount @@ -428,29 +421,6 @@ def create_tokens(manager: 'HathorManager', address_b58: Optional[str] = None, m return tx -def start_remote_storage(tx_storage=None): - """ Starts a remote storage - - :param tx_storage: storage to run in the remote storage - :type tx_storage: :py:class:`hathor.transaction.storage.TransactionStorage` - - :return: Remote tx storage and the remote server - :rtype: Tuple[:py:class:`hathor.transaction.storage.TransactionRemoteStorage`, grpc server] - """ - if not tx_storage: - tx_storage = TransactionMemoryStorage() - - _server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) - tx_storage._manually_initialize() - _servicer, port = create_transaction_storage_server(_server, tx_storage) - _server.start() - - tx_storage = TransactionRemoteStorage() - tx_storage.connect_to(port) - - return tx_storage, _server - - def create_script_with_sigops(nops: int) -> bytes: """ Generate a script with multiple OP_CHECKMULTISIG that amounts to `nops` sigops """