Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hathor/builder/sysctl_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Sysctl,
WebsocketManagerSysctl,
)
from hathor.sysctl.storage import StorageSysctl


class SysctlBuilder:
Expand All @@ -37,6 +38,7 @@ def build(self) -> Sysctl:

root.put_child('core', core)
root.put_child('p2p', ConnectionsManagerSysctl(self.artifacts.p2p_manager))
root.put_child('storage', StorageSysctl(self.artifacts.rocksdb_storage))

ws_factory = self.artifacts.manager.websocket_factory
if ws_factory is not None:
Expand Down
4 changes: 4 additions & 0 deletions hathor/storage/rocksdb_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ def __init__(
compression=rocksdb.CompressionType.no_compression,
allow_mmap_writes=True, # default is False
allow_mmap_reads=True, # default is already True
# This limits the total size of WAL files (the .log files) in RocksDB.
# When reached, a flush is triggered by RocksDB to free up space.
# This was added because we had cases where these files would accumulate and use too much disk space.
max_total_wal_size=3 * 1024 * 1024 * 1024, # 3GB
)

cf_names: list[bytes]
Expand Down
2 changes: 2 additions & 0 deletions hathor/sysctl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
from hathor.sysctl.core.manager import HathorManagerSysctl
from hathor.sysctl.feature_activation.manager import FeatureActivationSysctl
from hathor.sysctl.p2p.manager import ConnectionsManagerSysctl
from hathor.sysctl.storage.manager import StorageSysctl
from hathor.sysctl.sysctl import Sysctl
from hathor.sysctl.websocket.manager import WebsocketManagerSysctl

__all__ = [
'Sysctl',
'ConnectionsManagerSysctl',
'HathorManagerSysctl',
'StorageSysctl',
'WebsocketManagerSysctl',
'FeatureActivationSysctl',
]
17 changes: 17 additions & 0 deletions hathor/sysctl/storage/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright 2024 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from hathor.sysctl.storage.manager import StorageSysctl

__all__ = ['StorageSysctl']
127 changes: 127 additions & 0 deletions hathor/sysctl/storage/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Copyright 2024 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TYPE_CHECKING

from hathor.sysctl.sysctl import Sysctl, signal_handler_safe

if TYPE_CHECKING:
from hathor.storage import RocksDBStorage


class StorageSysctl(Sysctl):
def __init__(self, rocksdb_storage: 'RocksDBStorage') -> None:
super().__init__()

self.rocksdb_storage = rocksdb_storage
self.register(
'rocksdb.flush',
None,
self.set_rocksdb_flush,
)
self.register(
'rocksdb.memtable_stats',
self.get_rocksdb_memtable_stats,
None,
)
self.register(
'rocksdb.wal_stats',
self.get_rocksdb_wal_stats,
None,
)

@signal_handler_safe
def set_rocksdb_flush(self) -> None:
"""Manually trigger a RocksDB flush to persist memtables to disk.

This forces RocksDB to write all in-memory data (memtables) to SST files on disk.
Useful for ensuring data persistence or freeing up memory.
"""
db = self.rocksdb_storage.get_db()
# Flush all column families
# The flush method is available in python-rocksdb
try:
db.flush()
self.log.info('rocksdb flush completed successfully')
except AttributeError:
self.log.error('rocksdb flush method not available in this version of python-rocksdb')
except Exception as e:
self.log.error('error during rocksdb flush', error=str(e))

def get_rocksdb_memtable_stats(self) -> dict[str, float | str | dict[str, float]]:
"""Get memtable statistics for RocksDB.

Returns statistics including:
- total_size_bytes: Total size of all memtables across all column families in bytes
- size_bytes_per_cf: Dictionary with memtable size per column family in bytes

Memtable sizes are correlated with WAL sizes: flushing memtables to SST files
allows RocksDB to reclaim WAL disk space.
"""
db = self.rocksdb_storage.get_db()
result: dict[str, float | str | dict[str, float]] = {}

try:
# Get memtable size per column family
size_bytes_per_cf: dict[str, float] = {}
for cf in db.column_families:
cf_size = db.get_property(b'rocksdb.size-all-mem-tables', cf)
if cf_size:
cf_name = cf.name.decode('utf-8')
size_bytes_per_cf[cf_name] = float(cf_size.decode('utf-8'))

if size_bytes_per_cf:
result['size_bytes_per_cf'] = size_bytes_per_cf
result['total_size_bytes'] = sum(size_bytes_per_cf.values())

return result
except Exception as e:
self.log.error('error getting rocksdb memtable stats', error=str(e))
return {'error': str(e)}

def get_rocksdb_wal_stats(self) -> dict[str, float | str | list[dict[str, str | float]]]:
"""Get WAL (Write-Ahead Log) file statistics for RocksDB.

Scans the RocksDB data directory for .log files (WAL files) and returns:
- total_size_bytes: Total size of all WAL files in bytes
- file_count: Number of WAL files
- files: List of dicts with 'name' and 'size_bytes' for each WAL file

This is useful to monitor WAL file accumulation on disk.
"""
import os

db_path = os.path.join(self.rocksdb_storage.path, 'data_v2.db')
result: dict[str, float | str | list[dict[str, str | float]]] = {}

try:
files_info: list[dict[str, str | float]] = []
total_size_bytes = 0.0

if os.path.isdir(db_path):
for entry in os.listdir(db_path):
if entry.endswith('.log'):
full_path = os.path.join(db_path, entry)
size_bytes = float(os.path.getsize(full_path))
files_info.append({'name': entry, 'size_bytes': size_bytes})
total_size_bytes += size_bytes

result['total_size_bytes'] = total_size_bytes
result['file_count'] = float(len(files_info))
result['files'] = files_info

return result
except Exception as e:
self.log.error('error getting rocksdb wal stats', error=str(e))
return {'error': str(e)}
149 changes: 149 additions & 0 deletions hathor_tests/sysctl/test_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import unittest

from hathor.storage import RocksDBStorage
from hathor.sysctl import StorageSysctl


class StorageSysctlTestCase(unittest.TestCase):
def setUp(self) -> None:
super().setUp()
self.storage = RocksDBStorage.create_temp()
# Create a column family so we have more than just 'default'
self.test_cf = self.storage.get_or_create_column_family(b'test-cf')
self.sysctl = StorageSysctl(self.storage)

def tearDown(self) -> None:
self.storage.close()
super().tearDown()

def _write_data(self, num_entries: int = 100) -> None:
"""Write data to RocksDB so memtables and WAL have content."""
db = self.storage.get_db()
for i in range(num_entries):
db.put(f'key-{i}'.encode(), f'value-{i}'.encode())
db.put((self.test_cf, f'cf-key-{i}'.encode()), f'cf-value-{i}'.encode())

def test_flush_runs_without_error(self) -> None:
"""Flush on an empty DB should succeed without raising."""
self.sysctl.unsafe_set('rocksdb.flush', ())

def test_flush_reduces_memtable_size(self) -> None:
"""After writing data and flushing, memtable size should decrease or stay small."""
self._write_data(500)

stats_before = self.sysctl.get('rocksdb.memtable_stats')
self.sysctl.unsafe_set('rocksdb.flush', ())
stats_after = self.sysctl.get('rocksdb.memtable_stats')

# After a flush the memtable size should be <= what it was before
self.assertLess(
stats_after.get('total_size_bytes', 0),
stats_before.get('total_size_bytes', 0),
)

# -- memtable_stats tests --

def test_memtable_stats_returns_total_size_bytes(self) -> None:
"""memtable_stats should include total_size_bytes as a float."""
self._write_data()

stats = self.sysctl.get('rocksdb.memtable_stats')

self.assertIn('total_size_bytes', stats)
self.assertIsInstance(stats['total_size_bytes'], float)
self.assertGreater(stats['total_size_bytes'], 0)

def test_memtable_stats_returns_per_cf_sizes(self) -> None:
"""memtable_stats should include per-column-family memtable sizes."""
self._write_data()

stats = self.sysctl.get('rocksdb.memtable_stats')

self.assertIn('size_bytes_per_cf', stats)
per_cf = stats['size_bytes_per_cf']
self.assertIsInstance(per_cf, dict)
# Both CFs should be present since we wrote to both
self.assertIn('default', per_cf)
self.assertIsInstance(per_cf['default'], float)
self.assertGreater(per_cf['default'], 0)
self.assertIn('test-cf', per_cf)
self.assertIsInstance(per_cf['test-cf'], float)
self.assertGreater(per_cf['test-cf'], 0)

def test_memtable_stats_includes_created_column_family(self) -> None:
"""The column family we created in setUp should appear in memtable_stats."""
stats = self.sysctl.get('rocksdb.memtable_stats')

self.assertIn('size_bytes_per_cf', stats)
self.assertIn('test-cf', stats['size_bytes_per_cf'])

def test_memtable_total_size_bytes_equals_sum_of_per_cf(self) -> None:
"""total_size_bytes should be the sum of all per-CF sizes."""
self._write_data()

stats = self.sysctl.get('rocksdb.memtable_stats')

self.assertIn('total_size_bytes', stats)
self.assertIn('size_bytes_per_cf', stats)
self.assertAlmostEqual(stats['total_size_bytes'], sum(stats['size_bytes_per_cf'].values()))

def test_memtable_stats_on_empty_db(self) -> None:
"""memtable_stats on a fresh DB should return without errors."""
stats = self.sysctl.get('rocksdb.memtable_stats')

self.assertNotIn('error', stats)
self.assertIsInstance(stats, dict)

# -- wal_stats tests --

def test_wal_stats_returns_total_size_bytes(self) -> None:
"""wal_stats should include total_size_bytes of .log files on disk."""
self._write_data()

stats = self.sysctl.get('rocksdb.wal_stats')

self.assertIn('total_size_bytes', stats)
self.assertIsInstance(stats['total_size_bytes'], float)
self.assertGreater(stats['total_size_bytes'], 0)

def test_wal_stats_returns_file_count(self) -> None:
"""wal_stats should report the number of WAL files."""
self._write_data()

stats = self.sysctl.get('rocksdb.wal_stats')

self.assertIn('file_count', stats)
self.assertGreaterEqual(stats['file_count'], 1)

def test_wal_stats_files_list(self) -> None:
"""wal_stats should list individual .log files with name and size."""
self._write_data()

stats = self.sysctl.get('rocksdb.wal_stats')

self.assertIn('files', stats)
self.assertIsInstance(stats['files'], list)
for file_info in stats['files']:
self.assertIn('name', file_info)
self.assertIn('size_bytes', file_info)
self.assertTrue(file_info['name'].endswith('.log'))
self.assertGreater(file_info['size_bytes'], 0)

def test_wal_stats_total_equals_sum_of_files(self) -> None:
"""total_size_bytes should equal the sum of individual file sizes."""
self._write_data()

stats = self.sysctl.get('rocksdb.wal_stats')

files_sum = sum(f['size_bytes'] for f in stats['files'])
self.assertAlmostEqual(stats['total_size_bytes'], files_sum)

def test_wal_stats_on_empty_db(self) -> None:
"""wal_stats on a fresh DB should return valid structure without errors."""
stats = self.sysctl.get('rocksdb.wal_stats')

self.assertNotIn('error', stats)
self.assertIsInstance(stats, dict)
self.assertIn('total_size_bytes', stats)
self.assertIn('file_count', stats)
self.assertIn('files', stats)
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.