diff --git a/hathor/builder/sysctl_builder.py b/hathor/builder/sysctl_builder.py index 206547458e..b7073320bb 100644 --- a/hathor/builder/sysctl_builder.py +++ b/hathor/builder/sysctl_builder.py @@ -20,6 +20,7 @@ Sysctl, WebsocketManagerSysctl, ) +from hathor.sysctl.storage import StorageSysctl class SysctlBuilder: @@ -37,6 +38,7 @@ def build(self) -> Sysctl: root.put_child('core', core) root.put_child('p2p', ConnectionsManagerSysctl(self.artifacts.p2p_manager)) + root.put_child('storage', StorageSysctl(self.artifacts.rocksdb_storage)) ws_factory = self.artifacts.manager.websocket_factory if ws_factory is not None: diff --git a/hathor/storage/rocksdb_storage.py b/hathor/storage/rocksdb_storage.py index a92742c0b2..640dbb06de 100644 --- a/hathor/storage/rocksdb_storage.py +++ b/hathor/storage/rocksdb_storage.py @@ -47,6 +47,10 @@ def __init__( compression=rocksdb.CompressionType.no_compression, allow_mmap_writes=True, # default is False allow_mmap_reads=True, # default is already True + # This limits the total size of WAL files (the .log files) in RocksDB. + # When reached, a flush is triggered by RocksDB to free up space. + # This was added because we had cases where these files would accumulate and use too much disk space. + max_total_wal_size=3 * 1024 * 1024 * 1024, # 3GB ) cf_names: list[bytes] diff --git a/hathor/sysctl/__init__.py b/hathor/sysctl/__init__.py index a736376501..c3319903f2 100644 --- a/hathor/sysctl/__init__.py +++ b/hathor/sysctl/__init__.py @@ -15,6 +15,7 @@ from hathor.sysctl.core.manager import HathorManagerSysctl from hathor.sysctl.feature_activation.manager import FeatureActivationSysctl from hathor.sysctl.p2p.manager import ConnectionsManagerSysctl +from hathor.sysctl.storage.manager import StorageSysctl from hathor.sysctl.sysctl import Sysctl from hathor.sysctl.websocket.manager import WebsocketManagerSysctl @@ -22,6 +23,7 @@ 'Sysctl', 'ConnectionsManagerSysctl', 'HathorManagerSysctl', + 'StorageSysctl', 'WebsocketManagerSysctl', 'FeatureActivationSysctl', ] diff --git a/hathor/sysctl/storage/__init__.py b/hathor/sysctl/storage/__init__.py new file mode 100644 index 0000000000..205b08350f --- /dev/null +++ b/hathor/sysctl/storage/__init__.py @@ -0,0 +1,17 @@ +# Copyright 2024 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from hathor.sysctl.storage.manager import StorageSysctl + +__all__ = ['StorageSysctl'] diff --git a/hathor/sysctl/storage/manager.py b/hathor/sysctl/storage/manager.py new file mode 100644 index 0000000000..76d4148681 --- /dev/null +++ b/hathor/sysctl/storage/manager.py @@ -0,0 +1,127 @@ +# Copyright 2024 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import TYPE_CHECKING + +from hathor.sysctl.sysctl import Sysctl, signal_handler_safe + +if TYPE_CHECKING: + from hathor.storage import RocksDBStorage + + +class StorageSysctl(Sysctl): + def __init__(self, rocksdb_storage: 'RocksDBStorage') -> None: + super().__init__() + + self.rocksdb_storage = rocksdb_storage + self.register( + 'rocksdb.flush', + None, + self.set_rocksdb_flush, + ) + self.register( + 'rocksdb.memtable_stats', + self.get_rocksdb_memtable_stats, + None, + ) + self.register( + 'rocksdb.wal_stats', + self.get_rocksdb_wal_stats, + None, + ) + + @signal_handler_safe + def set_rocksdb_flush(self) -> None: + """Manually trigger a RocksDB flush to persist memtables to disk. + + This forces RocksDB to write all in-memory data (memtables) to SST files on disk. + Useful for ensuring data persistence or freeing up memory. + """ + db = self.rocksdb_storage.get_db() + # Flush all column families + # The flush method is available in python-rocksdb + try: + db.flush() + self.log.info('rocksdb flush completed successfully') + except AttributeError: + self.log.error('rocksdb flush method not available in this version of python-rocksdb') + except Exception as e: + self.log.error('error during rocksdb flush', error=str(e)) + + def get_rocksdb_memtable_stats(self) -> dict[str, float | str | dict[str, float]]: + """Get memtable statistics for RocksDB. + + Returns statistics including: + - total_size_bytes: Total size of all memtables across all column families in bytes + - size_bytes_per_cf: Dictionary with memtable size per column family in bytes + + Memtable sizes are correlated with WAL sizes: flushing memtables to SST files + allows RocksDB to reclaim WAL disk space. + """ + db = self.rocksdb_storage.get_db() + result: dict[str, float | str | dict[str, float]] = {} + + try: + # Get memtable size per column family + size_bytes_per_cf: dict[str, float] = {} + for cf in db.column_families: + cf_size = db.get_property(b'rocksdb.size-all-mem-tables', cf) + if cf_size: + cf_name = cf.name.decode('utf-8') + size_bytes_per_cf[cf_name] = float(cf_size.decode('utf-8')) + + if size_bytes_per_cf: + result['size_bytes_per_cf'] = size_bytes_per_cf + result['total_size_bytes'] = sum(size_bytes_per_cf.values()) + + return result + except Exception as e: + self.log.error('error getting rocksdb memtable stats', error=str(e)) + return {'error': str(e)} + + def get_rocksdb_wal_stats(self) -> dict[str, float | str | list[dict[str, str | float]]]: + """Get WAL (Write-Ahead Log) file statistics for RocksDB. + + Scans the RocksDB data directory for .log files (WAL files) and returns: + - total_size_bytes: Total size of all WAL files in bytes + - file_count: Number of WAL files + - files: List of dicts with 'name' and 'size_bytes' for each WAL file + + This is useful to monitor WAL file accumulation on disk. + """ + import os + + db_path = os.path.join(self.rocksdb_storage.path, 'data_v2.db') + result: dict[str, float | str | list[dict[str, str | float]]] = {} + + try: + files_info: list[dict[str, str | float]] = [] + total_size_bytes = 0.0 + + if os.path.isdir(db_path): + for entry in os.listdir(db_path): + if entry.endswith('.log'): + full_path = os.path.join(db_path, entry) + size_bytes = float(os.path.getsize(full_path)) + files_info.append({'name': entry, 'size_bytes': size_bytes}) + total_size_bytes += size_bytes + + result['total_size_bytes'] = total_size_bytes + result['file_count'] = float(len(files_info)) + result['files'] = files_info + + return result + except Exception as e: + self.log.error('error getting rocksdb wal stats', error=str(e)) + return {'error': str(e)} diff --git a/hathor_tests/sysctl/test_storage.py b/hathor_tests/sysctl/test_storage.py new file mode 100644 index 0000000000..0662f9ac49 --- /dev/null +++ b/hathor_tests/sysctl/test_storage.py @@ -0,0 +1,149 @@ +import unittest + +from hathor.storage import RocksDBStorage +from hathor.sysctl import StorageSysctl + + +class StorageSysctlTestCase(unittest.TestCase): + def setUp(self) -> None: + super().setUp() + self.storage = RocksDBStorage.create_temp() + # Create a column family so we have more than just 'default' + self.test_cf = self.storage.get_or_create_column_family(b'test-cf') + self.sysctl = StorageSysctl(self.storage) + + def tearDown(self) -> None: + self.storage.close() + super().tearDown() + + def _write_data(self, num_entries: int = 100) -> None: + """Write data to RocksDB so memtables and WAL have content.""" + db = self.storage.get_db() + for i in range(num_entries): + db.put(f'key-{i}'.encode(), f'value-{i}'.encode()) + db.put((self.test_cf, f'cf-key-{i}'.encode()), f'cf-value-{i}'.encode()) + + def test_flush_runs_without_error(self) -> None: + """Flush on an empty DB should succeed without raising.""" + self.sysctl.unsafe_set('rocksdb.flush', ()) + + def test_flush_reduces_memtable_size(self) -> None: + """After writing data and flushing, memtable size should decrease or stay small.""" + self._write_data(500) + + stats_before = self.sysctl.get('rocksdb.memtable_stats') + self.sysctl.unsafe_set('rocksdb.flush', ()) + stats_after = self.sysctl.get('rocksdb.memtable_stats') + + # After a flush the memtable size should be <= what it was before + self.assertLess( + stats_after.get('total_size_bytes', 0), + stats_before.get('total_size_bytes', 0), + ) + + # -- memtable_stats tests -- + + def test_memtable_stats_returns_total_size_bytes(self) -> None: + """memtable_stats should include total_size_bytes as a float.""" + self._write_data() + + stats = self.sysctl.get('rocksdb.memtable_stats') + + self.assertIn('total_size_bytes', stats) + self.assertIsInstance(stats['total_size_bytes'], float) + self.assertGreater(stats['total_size_bytes'], 0) + + def test_memtable_stats_returns_per_cf_sizes(self) -> None: + """memtable_stats should include per-column-family memtable sizes.""" + self._write_data() + + stats = self.sysctl.get('rocksdb.memtable_stats') + + self.assertIn('size_bytes_per_cf', stats) + per_cf = stats['size_bytes_per_cf'] + self.assertIsInstance(per_cf, dict) + # Both CFs should be present since we wrote to both + self.assertIn('default', per_cf) + self.assertIsInstance(per_cf['default'], float) + self.assertGreater(per_cf['default'], 0) + self.assertIn('test-cf', per_cf) + self.assertIsInstance(per_cf['test-cf'], float) + self.assertGreater(per_cf['test-cf'], 0) + + def test_memtable_stats_includes_created_column_family(self) -> None: + """The column family we created in setUp should appear in memtable_stats.""" + stats = self.sysctl.get('rocksdb.memtable_stats') + + self.assertIn('size_bytes_per_cf', stats) + self.assertIn('test-cf', stats['size_bytes_per_cf']) + + def test_memtable_total_size_bytes_equals_sum_of_per_cf(self) -> None: + """total_size_bytes should be the sum of all per-CF sizes.""" + self._write_data() + + stats = self.sysctl.get('rocksdb.memtable_stats') + + self.assertIn('total_size_bytes', stats) + self.assertIn('size_bytes_per_cf', stats) + self.assertAlmostEqual(stats['total_size_bytes'], sum(stats['size_bytes_per_cf'].values())) + + def test_memtable_stats_on_empty_db(self) -> None: + """memtable_stats on a fresh DB should return without errors.""" + stats = self.sysctl.get('rocksdb.memtable_stats') + + self.assertNotIn('error', stats) + self.assertIsInstance(stats, dict) + + # -- wal_stats tests -- + + def test_wal_stats_returns_total_size_bytes(self) -> None: + """wal_stats should include total_size_bytes of .log files on disk.""" + self._write_data() + + stats = self.sysctl.get('rocksdb.wal_stats') + + self.assertIn('total_size_bytes', stats) + self.assertIsInstance(stats['total_size_bytes'], float) + self.assertGreater(stats['total_size_bytes'], 0) + + def test_wal_stats_returns_file_count(self) -> None: + """wal_stats should report the number of WAL files.""" + self._write_data() + + stats = self.sysctl.get('rocksdb.wal_stats') + + self.assertIn('file_count', stats) + self.assertGreaterEqual(stats['file_count'], 1) + + def test_wal_stats_files_list(self) -> None: + """wal_stats should list individual .log files with name and size.""" + self._write_data() + + stats = self.sysctl.get('rocksdb.wal_stats') + + self.assertIn('files', stats) + self.assertIsInstance(stats['files'], list) + for file_info in stats['files']: + self.assertIn('name', file_info) + self.assertIn('size_bytes', file_info) + self.assertTrue(file_info['name'].endswith('.log')) + self.assertGreater(file_info['size_bytes'], 0) + + def test_wal_stats_total_equals_sum_of_files(self) -> None: + """total_size_bytes should equal the sum of individual file sizes.""" + self._write_data() + + stats = self.sysctl.get('rocksdb.wal_stats') + + files_sum = sum(f['size_bytes'] for f in stats['files']) + self.assertAlmostEqual(stats['total_size_bytes'], files_sum) + + def test_wal_stats_on_empty_db(self) -> None: + """wal_stats on a fresh DB should return valid structure without errors.""" + stats = self.sysctl.get('rocksdb.wal_stats') + + self.assertNotIn('error', stats) + self.assertIsInstance(stats, dict) + self.assertIn('total_size_bytes', stats) + self.assertIn('file_count', stats) + self.assertIn('files', stats) diff --git a/poetry.lock b/poetry.lock index 162a533f42..513ba73b94 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2032,7 +2032,7 @@ test = ["pytest"] type = "git" url = "https://github.com/hathornetwork/python-rocksdb.git" reference = "HEAD" -resolved_reference = "9cb62eeae85002b3c0c9bf8d6625fb0d2b6e8a49" +resolved_reference = "f1da1157464db7ed050dff9a66da920394909e7b" [[package]] name = "sentry-sdk"