Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hathor/transaction/storage/rocksdb_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def _start_flush_thread(self) -> None:
deferred.addErrback(self._err_flush_thread)
self.cache_data.flush_deferred = deferred

def _cb_flush_thread(self) -> None:
def _cb_flush_thread(self, _res: None) -> None:
self._reactor.callLater(self.cache_data.interval, self._start_flush_thread)
self.cache_data.flush_deferred = None

Expand Down
54 changes: 53 additions & 1 deletion hathor_tests/tx/test_cache_storage.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
from unittest.mock import Mock

from twisted.internet.task import deferLater

from hathor.daa import TestMode
from hathor.reactor import get_global_reactor
from hathor.simulator.utils import add_new_blocks
from hathor.transaction import Transaction, TransactionMetadata
from hathor.transaction.storage import TransactionRocksDBStorage
Expand Down Expand Up @@ -123,11 +128,13 @@ def test_flush_thread(self):
# Flush deferred is not None
self.assertIsNotNone(self.cache_storage.cache_data.flush_deferred)
last_flush_deferred = self.cache_storage.cache_data.flush_deferred

# A call when the deferred already exists, shouldn't override it
self.cache_storage._start_flush_thread()
self.assertEqual(last_flush_deferred, self.cache_storage.cache_data.flush_deferred)

# We flush the cache and flush_deferred becomes None
self.cache_storage._cb_flush_thread()
self.cache_storage._cb_flush_thread(None)
self.assertIsNone(self.cache_storage.cache_data.flush_deferred)
# After the interval it becomes not None again
self.clock.advance(10)
Expand All @@ -143,6 +150,51 @@ def test_flush_thread(self):
del self.cache_storage.cache_data.cache[next(iter(self.cache_storage.cache_data.dirty_txs))]
self.cache_storage._flush_to_storage(self.cache_storage.cache_data.dirty_txs.copy())

async def test_flush_thread_global_reactor(self) -> None:
interval = 1
reactor = get_global_reactor()
artifacts = self.get_builder() \
.use_tx_storage_cache(capacity=5) \
.set_wallet(self._create_test_wallet(unlocked=True)) \
.set_reactor(reactor) \
.build()

self.manager = artifacts.manager
self.cache_storage = self.manager.tx_storage
self.cache_storage.cache_data.interval = interval

og_start_flush_thread = self.cache_storage._start_flush_thread
og_cb_flush_thread = self.cache_storage._cb_flush_thread
og_err_flush_thread = self.cache_storage._err_flush_thread

self.cache_storage._start_flush_thread = Mock(wraps=og_start_flush_thread)
self.cache_storage._cb_flush_thread = Mock(wraps=og_cb_flush_thread)
self.cache_storage._err_flush_thread = Mock(wraps=og_err_flush_thread)

self.manager.start()

txs = [self._get_new_tx(nonce) for nonce in range(CACHE_SIZE)]
for tx in txs:
self.cache_storage.save_transaction(tx)

for tx in txs:
assert tx.hash in self.cache_storage.cache_data.dirty_txs

assert self.cache_storage.cache_data.flush_deferred is None

assert self.cache_storage._start_flush_thread.call_count == 0
assert self.cache_storage._cb_flush_thread.call_count == 0
assert self.cache_storage._err_flush_thread.call_count == 0

await deferLater(reactor, interval + 0.1, lambda: None)

assert self.cache_storage._start_flush_thread.call_count == 1
assert self.cache_storage._cb_flush_thread.call_count == 1
assert self.cache_storage._err_flush_thread.call_count == 0

assert self.cache_storage.cache_data.flush_deferred is None
self.clean_pending(required_to_quiesce=False)

def test_topological_sort_dfs(self):
self.manager.daa.TEST_MODE = TestMode.TEST_ALL_WEIGHT
add_new_blocks(self.manager, 11, advance_clock=1)
Expand Down