Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions hathor/cli/run_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def create_parser(self) -> ArgumentParser:
parser.add_argument('--stratum', type=int, help='Port to run stratum server')
parser.add_argument('--data', help='Data directory')
parser.add_argument('--rocksdb-storage', action='store_true', help='Use RocksDB storage backend')
# parser.add_argument('--rocksdb-cache', type=int, help='Size (bytes) of RocksDB block-table cache', default=0)
parser.add_argument('--wallet', help='Set wallet type. Options are hd (Hierarchical Deterministic) or keypair',
default=None)
parser.add_argument('--wallet-enable-api', action='store_true',
Expand Down Expand Up @@ -88,7 +89,7 @@ def prepare(self, args: Namespace) -> None:
TransactionCacheStorage,
TransactionCompactStorage,
TransactionMemoryStorage,
TransactionRocksDBStorage,
TransactionOldRocksDBStorage,
TransactionStorage,
)
from hathor.wallet import HDWallet, Wallet
Expand Down Expand Up @@ -166,7 +167,10 @@ def create_wallet():
tx_storage: TransactionStorage
if args.data:
if args.rocksdb_storage:
tx_storage = TransactionRocksDBStorage(path=args.data, with_index=(not args.cache))
# cache_capacity = args.rocksdb_cache or None
# tx_storage = TransactionRocksDBStorage(path=args.data, with_index=(not args.cache),
# cache_capacity=cache_capacity)
tx_storage = TransactionOldRocksDBStorage(path=args.data, with_index=(not args.cache))
else:
tx_storage = TransactionCompactStorage(path=args.data, with_index=(not args.cache))
self.log.info('with storage', storage_class=type(tx_storage).__name__, path=args.data)
Expand Down
2 changes: 2 additions & 0 deletions hathor/transaction/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from hathor.transaction.storage.transaction_storage import TransactionStorage

try:
from hathor.transaction.storage.old_rocksdb_storage import TransactionOldRocksDBStorage
from hathor.transaction.storage.rocksdb_storage import TransactionRocksDBStorage
except ImportError:
pass
Expand All @@ -30,4 +31,5 @@
'TransactionCacheStorage',
'TransactionBinaryStorage',
'TransactionRocksDBStorage',
'TransactionOldRocksDBStorage',
]
141 changes: 141 additions & 0 deletions hathor/transaction/storage/old_rocksdb_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Copyright 2021 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from typing import TYPE_CHECKING, Iterator, Optional

from hathor.transaction.storage.exceptions import TransactionDoesNotExist
from hathor.transaction.storage.transaction_storage import BaseTransactionStorage

if TYPE_CHECKING:
from hathor.transaction import BaseTransaction


class TransactionOldRocksDBStorage(BaseTransactionStorage):
"""This storage saves tx and metadata to the same key on RocksDB

It uses Protobuf serialization internally.
"""

def __init__(self, path='./', with_index=True):
import rocksdb
tx_dir = os.path.join(path, 'tx.db')
self._db = rocksdb.DB(tx_dir, rocksdb.Options(create_if_missing=True))

attributes_dir = os.path.join(path, 'attributes.db')
self.attributes_db = rocksdb.DB(attributes_dir, rocksdb.Options(create_if_missing=True))
super().__init__(with_index=with_index)

def _load_from_bytes(self, data: bytes) -> 'BaseTransaction':
from hathor import protos
from hathor.transaction.base_transaction import tx_or_block_from_proto

tx_proto = protos.BaseTransaction()
tx_proto.ParseFromString(data)
return tx_or_block_from_proto(tx_proto, storage=self)

def _tx_to_bytes(self, tx: 'BaseTransaction') -> bytes:
tx_proto = tx.to_proto()
return tx_proto.SerializeToString()

def remove_transaction(self, tx: 'BaseTransaction') -> None:
super().remove_transaction(tx)
self._db.delete(tx.hash)
self._remove_from_weakref(tx)

def save_transaction(self, tx: 'BaseTransaction', *, only_metadata: bool = False) -> None:
super().save_transaction(tx, only_metadata=only_metadata)
self._save_transaction(tx, only_metadata=only_metadata)
self._save_to_weakref(tx)

def _save_transaction(self, tx: 'BaseTransaction', *, only_metadata: bool = False) -> None:
data = self._tx_to_bytes(tx)
key = tx.hash
self._db.put(key, data)

def transaction_exists(self, hash_bytes: bytes) -> bool:
may_exist, _ = self._db.key_may_exist(hash_bytes)
if not may_exist:
return False
tx_exists = self._get_transaction_from_db(hash_bytes) is not None
return tx_exists

def _get_transaction(self, hash_bytes: bytes) -> 'BaseTransaction':
tx = self.get_transaction_from_weakref(hash_bytes)
if tx is not None:
return tx

tx = self._get_transaction_from_db(hash_bytes)
if not tx:
raise TransactionDoesNotExist(hash_bytes.hex())

assert tx.hash == hash_bytes

self._save_to_weakref(tx)
return tx

def _get_transaction_from_db(self, hash_bytes: bytes) -> Optional['BaseTransaction']:
key = hash_bytes
data = self._db.get(key)
if data is None:
return None
tx = self._load_from_bytes(data)
return tx

def get_all_transactions(self) -> Iterator['BaseTransaction']:
tx: Optional['BaseTransaction']

items = self._db.iteritems()
items.seek_to_first()

def get_tx(hash_bytes, data):
tx = self.get_transaction_from_weakref(hash_bytes)
if tx is None:
tx = self._load_from_bytes(data)
assert tx.hash == hash_bytes
self._save_to_weakref(tx)
return tx

for key, data in items:
hash_bytes = key

lock = self._get_lock(hash_bytes)
if lock:
with lock:
tx = get_tx(hash_bytes, data)
else:
tx = get_tx(hash_bytes, data)

assert tx is not None
yield tx

def get_count_tx_blocks(self) -> int:
# XXX: there may be a more efficient way, see: https://stackoverflow.com/a/25775882
keys = self._db.iterkeys()
keys.seek_to_first()
keys_count = sum(1 for _ in keys)
return keys_count

def add_value(self, key: str, value: str) -> None:
self.attributes_db.put(key.encode('utf-8'), value.encode('utf-8'))

def remove_value(self, key: str) -> None:
self.attributes_db.delete(key.encode('utf-8'))

def get_value(self, key: str) -> Optional[str]:
data = self.attributes_db.get(key.encode('utf-8'))
if data is None:
return None
else:
return data.decode()
128 changes: 86 additions & 42 deletions hathor/transaction/storage/rocksdb_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@

from hathor.transaction.storage.exceptions import TransactionDoesNotExist
from hathor.transaction.storage.transaction_storage import BaseTransactionStorage
from hathor.util import json_dumpb, json_loadb

if TYPE_CHECKING:
from hathor.transaction import BaseTransaction
from hathor.transaction import BaseTransaction, TransactionMetadata


_DB_NAME = 'data_v2.db'
_CF_NAME_TX = b'tx'
_CF_NAME_META = b'meta'
_CF_NAME_ATTR = b'attr'


class TransactionRocksDBStorage(BaseTransactionStorage):
Expand All @@ -28,30 +35,62 @@ class TransactionRocksDBStorage(BaseTransactionStorage):
It uses Protobuf serialization internally.
"""

def __init__(self, path='./', with_index=True):
def __init__(self, path: str = './', with_index: bool = True, cache_capacity: Optional[int] = None):
import rocksdb
tx_dir = os.path.join(path, 'tx.db')
self._db = rocksdb.DB(tx_dir, rocksdb.Options(create_if_missing=True))

attributes_dir = os.path.join(path, 'attributes.db')
self.attributes_db = rocksdb.DB(attributes_dir, rocksdb.Options(create_if_missing=True))
tx_dir = os.path.join(path, _DB_NAME)
lru_cache = cache_capacity and rocksdb.LRUCache(cache_capacity)
table_factory = rocksdb.BlockBasedTableFactory(block_cache=lru_cache)
opts = dict(
table_factory=table_factory,
write_buffer_size=83886080, # 80MB (default is 4MB)
compression=rocksdb.CompressionType.no_compression,
allow_mmap_writes=True, # default is False
allow_mmap_reads=True, # default is already True
)

try:
self.log.debug('create new db')
new_db = rocksdb.DB(tx_dir, rocksdb.Options(error_if_exists=True, create_if_missing=True, **opts))
new_db.create_column_family(_CF_NAME_TX, rocksdb.ColumnFamilyOptions())
new_db.create_column_family(_CF_NAME_META, rocksdb.ColumnFamilyOptions())
new_db.create_column_family(_CF_NAME_ATTR, rocksdb.ColumnFamilyOptions())
del new_db # XXX: this forces deallocation allowing us to reopen the db right after
except rocksdb.errors.InvalidArgument:
self.log.debug('db already exists')

options = rocksdb.Options(**opts)
self._db = rocksdb.DB(tx_dir, options, column_families={
_CF_NAME_TX: rocksdb.ColumnFamilyOptions(),
_CF_NAME_META: rocksdb.ColumnFamilyOptions(),
_CF_NAME_ATTR: rocksdb.ColumnFamilyOptions(),
})

self._cf_tx = self._db.get_column_family(_CF_NAME_TX)
self._cf_meta = self._db.get_column_family(_CF_NAME_META)
self._cf_attr = self._db.get_column_family(_CF_NAME_ATTR)

super().__init__(with_index=with_index)

def _load_from_bytes(self, data: bytes) -> 'BaseTransaction':
from hathor import protos
from hathor.transaction.base_transaction import tx_or_block_from_proto
def _load_from_bytes(self, tx_data: bytes, meta_data: bytes) -> 'BaseTransaction':
from hathor.transaction.base_transaction import tx_or_block_from_bytes
from hathor.transaction.transaction_metadata import TransactionMetadata

tx_proto = protos.BaseTransaction()
tx_proto.ParseFromString(data)
return tx_or_block_from_proto(tx_proto, storage=self)
tx = tx_or_block_from_bytes(tx_data)
tx._metadata = TransactionMetadata.create_from_json(json_loadb(meta_data))
tx.storage = self
return tx

def _tx_to_bytes(self, tx: 'BaseTransaction') -> bytes:
tx_proto = tx.to_proto()
return tx_proto.SerializeToString()
return bytes(tx)

def _meta_to_bytes(self, meta: 'TransactionMetadata') -> bytes:
return json_dumpb(meta.to_json())

def remove_transaction(self, tx: 'BaseTransaction') -> None:
super().remove_transaction(tx)
self._db.delete(tx.hash)
self._db.delete((self._cf_tx, tx.hash))
self._db.delete((self._cf_meta, tx.hash))
self._remove_from_weakref(tx)

def save_transaction(self, tx: 'BaseTransaction', *, only_metadata: bool = False) -> None:
Expand All @@ -60,15 +99,18 @@ def save_transaction(self, tx: 'BaseTransaction', *, only_metadata: bool = False
self._save_to_weakref(tx)

def _save_transaction(self, tx: 'BaseTransaction', *, only_metadata: bool = False) -> None:
data = self._tx_to_bytes(tx)
key = tx.hash
self._db.put(key, data)
if not only_metadata:
tx_data = self._tx_to_bytes(tx)
self._db.put((self._cf_tx, key), tx_data)
meta_data = self._meta_to_bytes(tx.get_metadata(use_storage=False))
self._db.put((self._cf_meta, key), meta_data)

def transaction_exists(self, hash_bytes: bytes) -> bool:
may_exist, _ = self._db.key_may_exist(hash_bytes)
may_exist, _ = self._db.key_may_exist((self._cf_tx, hash_bytes))
if not may_exist:
return False
tx_exists = self._get_transaction_from_db(hash_bytes) is not None
tx_exists = self._db.get((self._cf_tx, hash_bytes)) is not None
return tx_exists

def _get_transaction(self, hash_bytes: bytes) -> 'BaseTransaction':
Expand All @@ -87,54 +129,56 @@ def _get_transaction(self, hash_bytes: bytes) -> 'BaseTransaction':

def _get_transaction_from_db(self, hash_bytes: bytes) -> Optional['BaseTransaction']:
key = hash_bytes
data = self._db.get(key)
if data is None:
tx_data = self._db.get((self._cf_tx, key))
meta_data = self._db.get((self._cf_meta, key))
if tx_data is None:
return None
tx = self._load_from_bytes(data)
assert meta_data is not None, 'expected metadata to exist when tx exists'
tx = self._load_from_bytes(tx_data, meta_data)
return tx

def _get_tx(self, hash_bytes: bytes, tx_data: bytes) -> 'BaseTransaction':
tx = self.get_transaction_from_weakref(hash_bytes)
if tx is None:
meta_data = self._db.get((self._cf_meta, hash_bytes))
tx = self._load_from_bytes(tx_data, meta_data)
assert tx.hash == hash_bytes
self._save_to_weakref(tx)
return tx

def get_all_transactions(self) -> Iterator['BaseTransaction']:
tx: Optional['BaseTransaction']

items = self._db.iteritems()
items = self._db.iteritems(self._cf_tx)
items.seek_to_first()

def get_tx(hash_bytes, data):
tx = self.get_transaction_from_weakref(hash_bytes)
if tx is None:
tx = self._load_from_bytes(data)
assert tx.hash == hash_bytes
self._save_to_weakref(tx)
return tx

for key, data in items:
hash_bytes = key
for key, tx_data in items:
_, hash_bytes = key

lock = self._get_lock(hash_bytes)
if lock:
with lock:
tx = get_tx(hash_bytes, data)
tx = self._get_tx(hash_bytes, tx_data)
else:
tx = get_tx(hash_bytes, data)
tx = self._get_tx(hash_bytes, tx_data)

assert tx is not None
yield tx

def get_count_tx_blocks(self) -> int:
# XXX: there may be a more efficient way, see: https://stackoverflow.com/a/25775882
keys = self._db.iterkeys()
keys.seek_to_first()
keys_count = sum(1 for _ in keys)
keys_bcount = self._db.get_property(b'rocksdb.estimate-num-keys', self._cf_tx)
keys_count = int(keys_bcount)
return keys_count

def add_value(self, key: str, value: str) -> None:
self.attributes_db.put(key.encode('utf-8'), value.encode('utf-8'))
self._db.put((self._cf_attr, key.encode('utf-8')), value.encode('utf-8'))

def remove_value(self, key: str) -> None:
self.attributes_db.delete(key.encode('utf-8'))
self._db.delete((self._cf_attr, key.encode('utf-8')))

def get_value(self, key: str) -> Optional[str]:
data = self.attributes_db.get(key.encode('utf-8'))
data = self._db.get((self._cf_attr, key.encode('utf-8')))

if data is None:
return None
else:
Expand Down
Loading