diff --git a/hathor/transaction/storage/rocksdb_storage.py b/hathor/transaction/storage/rocksdb_storage.py index b280ad750..95932d2c7 100644 --- a/hathor/transaction/storage/rocksdb_storage.py +++ b/hathor/transaction/storage/rocksdb_storage.py @@ -115,7 +115,7 @@ def _start_flush_thread(self) -> None: deferred.addErrback(self._err_flush_thread) self.cache_data.flush_deferred = deferred - def _cb_flush_thread(self) -> None: + def _cb_flush_thread(self, _res: None) -> None: self._reactor.callLater(self.cache_data.interval, self._start_flush_thread) self.cache_data.flush_deferred = None diff --git a/hathor_tests/tx/test_cache_storage.py b/hathor_tests/tx/test_cache_storage.py index 0cb8633d9..dba4e3d97 100644 --- a/hathor_tests/tx/test_cache_storage.py +++ b/hathor_tests/tx/test_cache_storage.py @@ -1,4 +1,9 @@ +from unittest.mock import Mock + +from twisted.internet.task import deferLater + from hathor.daa import TestMode +from hathor.reactor import get_global_reactor from hathor.simulator.utils import add_new_blocks from hathor.transaction import Transaction, TransactionMetadata from hathor.transaction.storage import TransactionRocksDBStorage @@ -123,11 +128,13 @@ def test_flush_thread(self): # Flush deferred is not None self.assertIsNotNone(self.cache_storage.cache_data.flush_deferred) last_flush_deferred = self.cache_storage.cache_data.flush_deferred + + # A call when the deferred already exists, shouldn't override it self.cache_storage._start_flush_thread() self.assertEqual(last_flush_deferred, self.cache_storage.cache_data.flush_deferred) # We flush the cache and flush_deferred becomes None - self.cache_storage._cb_flush_thread() + self.cache_storage._cb_flush_thread(None) self.assertIsNone(self.cache_storage.cache_data.flush_deferred) # After the interval it becomes not None again self.clock.advance(10) @@ -143,6 +150,51 @@ def test_flush_thread(self): del self.cache_storage.cache_data.cache[next(iter(self.cache_storage.cache_data.dirty_txs))] self.cache_storage._flush_to_storage(self.cache_storage.cache_data.dirty_txs.copy()) + async def test_flush_thread_global_reactor(self) -> None: + interval = 1 + reactor = get_global_reactor() + artifacts = self.get_builder() \ + .use_tx_storage_cache(capacity=5) \ + .set_wallet(self._create_test_wallet(unlocked=True)) \ + .set_reactor(reactor) \ + .build() + + self.manager = artifacts.manager + self.cache_storage = self.manager.tx_storage + self.cache_storage.cache_data.interval = interval + + og_start_flush_thread = self.cache_storage._start_flush_thread + og_cb_flush_thread = self.cache_storage._cb_flush_thread + og_err_flush_thread = self.cache_storage._err_flush_thread + + self.cache_storage._start_flush_thread = Mock(wraps=og_start_flush_thread) + self.cache_storage._cb_flush_thread = Mock(wraps=og_cb_flush_thread) + self.cache_storage._err_flush_thread = Mock(wraps=og_err_flush_thread) + + self.manager.start() + + txs = [self._get_new_tx(nonce) for nonce in range(CACHE_SIZE)] + for tx in txs: + self.cache_storage.save_transaction(tx) + + for tx in txs: + assert tx.hash in self.cache_storage.cache_data.dirty_txs + + assert self.cache_storage.cache_data.flush_deferred is None + + assert self.cache_storage._start_flush_thread.call_count == 0 + assert self.cache_storage._cb_flush_thread.call_count == 0 + assert self.cache_storage._err_flush_thread.call_count == 0 + + await deferLater(reactor, interval + 0.1, lambda: None) + + assert self.cache_storage._start_flush_thread.call_count == 1 + assert self.cache_storage._cb_flush_thread.call_count == 1 + assert self.cache_storage._err_flush_thread.call_count == 0 + + assert self.cache_storage.cache_data.flush_deferred is None + self.clean_pending(required_to_quiesce=False) + def test_topological_sort_dfs(self): self.manager.daa.TEST_MODE = TestMode.TEST_ALL_WEIGHT add_new_blocks(self.manager, 11, advance_clock=1)