diff --git a/.gitignore b/.gitignore index 36dff8c..7ea0a15 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,6 @@ dist/ *.egg-info htmlcov/ + +claude.md +.claude/ diff --git a/docker-compose.dev-miner.yml b/docker-compose.dev-miner.yml new file mode 100644 index 0000000..7c3c5b4 --- /dev/null +++ b/docker-compose.dev-miner.yml @@ -0,0 +1,81 @@ +# Example docker-compose for running tx-mining-service in dev-miner mode. +# +# BEFORE (production-like test setup): +# +# fullnode ──stratum──▶ tx-mining-service ◀──stratum──── cpuminer +# ▲ +# │ HTTP +# test runner +# +# AFTER (dev-miner setup): +# +# fullnode ◀──HTTP──── tx-mining-service (dev-miner mode) +# ▲ +# │ HTTP +# test runner +# +# The cpuminer container is eliminated entirely. tx-mining-service handles +# both block production (background loop) and transaction PoW (in-process). +# +# Usage: +# docker compose -f docker-compose.dev-miner.yml up +# +# To use this from another project's docker-compose, build the image first: +# docker build -t hathornetwork/dev-miner . +# Then reference it in your compose file. + +services: + fullnode: + # This image must include the --test-mode-block-weight flag, which is a + # proposed change to hathor-core that reduces block weight to 1 (bypassing + # the DAA), just like --test-mode-tx-weight does for transactions. + # See: https://github.com/HathorNetwork/hathor-core (branch: feature/test-mode-block-weight) + image: hathor-core:test-mode-block-weight + command: [ + "run_node", + "--status", "8080", + # Reduce transaction weight to ~1 (existing flag, already used in tests). + "--test-mode-tx-weight", + # Reduce block weight to 1 (NEW flag from this RFC). Without this, blocks + # still require full DAA weight, making in-process mining impractical. + "--test-mode-block-weight", + "--allow-mining-without-peers", + "--unsafe-mode", "nano-testnet-bravo", + "--data", "./tmp", + ] + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8080/v1a/status"] + interval: 5s + timeout: 5s + retries: 10 + ports: + - "8080:8080" + networks: + - hathor-privnet + + tx-mining-service: + platform: linux/amd64 + build: . + depends_on: + fullnode: + condition: service_healthy + ports: + - "8035:8035" + command: [ + "http://fullnode:8080", + # Enables dev-miner mode (RunDevService instead of RunService). + "--dev-miner", + "--api-port=8035", + # Produce one block per second. This gives tests a predictable cadence + # for reward lock releases and confirmation accumulation. + "--block-interval=1000", + "--testnet", + # Mining rewards go to this address. In test environments, this is + # typically a pre-generated address from the test wallet. + "--address", "WTjhJXzQJETVx7BVXdyZmvk396DRRsubdw", + ] + networks: + - hathor-privnet + +networks: + hathor-privnet: diff --git a/tests/test_dev_miner.py b/tests/test_dev_miner.py new file mode 100644 index 0000000..fcc25df --- /dev/null +++ b/tests/test_dev_miner.py @@ -0,0 +1,405 @@ +""" +Copyright (c) Hathor Labs and its affiliates. + +This source code is licensed under the MIT license found in the +LICENSE file in the root directory of this source tree. + +Tests for the dev-miner mode. + +These tests validate the three core components of the dev-miner: + +1. solve_tx / solve_block — brute-force PoW solvers that replace stratum miners +2. DevMiningManager — drop-in replacement for TxMiningManager, tested through + the same HTTP API endpoints that production uses (submit-job, job-status, etc.) +3. BlockMiner — background block producer, tested for interval regularity under + both trivial and slow mining conditions +""" + +import asyncio +import time as _time +import unittest +from unittest.mock import AsyncMock, MagicMock, patch + +from aiohttp.test_utils import AioHTTPTestCase, unittest_run_loop +from hathorlib.base_transaction import tx_or_block_from_bytes + +import txstratum.time +from txstratum.api import App +from txstratum.dev.block_miner import BlockMiner, solve_block +from txstratum.dev.manager import DevMiningManager +from txstratum.dev.tx_miner import solve_tx + +# Same TX1_DATA from test_api.py — a serialized transaction that we use as a +# realistic input for PoW solving. The weight is ~32, so without +# --test-mode-tx-weight it requires ~2^32 nonce iterations (a few seconds). +TX1_DATA = bytes.fromhex( + "0001000102000000000000089c0d40a9b1edfb499bc624833fde87ae459d495000393f4aaa00006" + "a473045022100c407d5e8f411f9ae582ebd7acbfcb6ea6170332709fb69acaa34c1b426f1d8f502" + "2003847963768eca9bcdf46e758319fb2699fd28ab657d00f54bef46c37a90405e2103755f2920f" + "f7dc32dc5414cea1cf9e078347f40894caf0c03637d083dbb261c5c000003e800001976a914a04c" + "9e2a0291f53c618fdad2ecb37748efb0eeeb88ac0000151800001976a914545f1156a3b00df622b" + "1d92968c21b962e9d7aa588ac4032a8228c4020c35ed18547020000000047c9881d2bf348d5ffd6" + "ce8398d6bc5d17b3bea75a53c15b7480be950000006ed5794bf69ebe7d7d75e7a0024d98acb85cb" + "9c101b59b8b6073e8667c84e2ee77" +) + + +def update_timestamp(tx_bytes: bytes, *, delta: int = 0) -> bytes: + """Update timestamp to current timestamp. + + The fullnode rejects transactions with timestamps too far from the current + time, so we refresh the timestamp before each test. + """ + tx = tx_or_block_from_bytes(tx_bytes) + tx.timestamp = int(txstratum.time.time()) + delta + return bytes(tx) + + +# --------------------------------------------------------------------------- +# Transaction PoW tests +# --------------------------------------------------------------------------- + + +class TestSolveTx(unittest.TestCase): + """Test the synchronous tx miner (solve_tx). + + These verify that the nonce brute-force loop finds a valid PoW solution + for both trivial weight (~1, simulating --test-mode-tx-weight) and the + transaction's natural weight (~32). + """ + + __test__ = True + + def test_solve_tx_trivial_weight(self): + """With weight=1 (--test-mode-tx-weight), ~50% of nonces are valid.""" + tx_bytes = update_timestamp(TX1_DATA) + tx = tx_or_block_from_bytes(tx_bytes) + tx.weight = 1.0 + result = solve_tx(tx) + self.assertTrue(result) + self.assertTrue(tx.verify_pow()) + + def test_solve_tx_standard_weight(self): + """With the transaction's natural weight (~32), solve_tx still works + but takes more iterations. This validates the full nonce range. + """ + tx_bytes = update_timestamp(TX1_DATA) + tx = tx_or_block_from_bytes(tx_bytes) + result = solve_tx(tx) + self.assertTrue(result) + self.assertTrue(tx.verify_pow()) + + +# --------------------------------------------------------------------------- +# DevMiningManager tests (via HTTP API) +# --------------------------------------------------------------------------- + + +class TestDevMiningManager(AioHTTPTestCase): + """Test DevMiningManager through the same HTTP API used in production. + + These tests exercise the full job lifecycle: submit-job → poll job-status → + done. They prove that DevMiningManager is a valid drop-in for TxMiningManager + without any API-level changes. + """ + + __test__ = True + + async def get_application(self): + self.backend = MagicMock() + self.backend.get_tx_parents = AsyncMock( + return_value=[b"\x00" * 32, b"\x01" * 32] + ) + self.backend.push_tx_or_block = AsyncMock(return_value=True) + self.manager = DevMiningManager(backend=self.backend) + await self.manager.start() + self.healthcheck = MagicMock() + self.myapp = App(self.manager, self.healthcheck) + return self.myapp.app + + @unittest_run_loop + async def test_submit_and_poll_job(self): + """Basic lifecycle: submit a tx, poll until solved.""" + tx_hex = update_timestamp(TX1_DATA).hex() + resp = await self.client.request("POST", "/submit-job", json={"tx": tx_hex}) + self.assertEqual(200, resp.status) + data = await resp.json() + job_id = data["job_id"] + self.assertIn("status", data) + # expected_total_time is 0 because there's no queue or stratum latency. + self.assertEqual(0, data["expected_total_time"]) + + # Poll for completion. Mining runs in an async task (via run_in_executor), + # so we need to give it time to finish. + for _ in range(50): + await asyncio.sleep(0.1) + resp = await self.client.request( + "GET", "/job-status", params={"job-id": job_id} + ) + data = await resp.json() + if data["status"] == "done": + break + + self.assertEqual(200, resp.status) + self.assertEqual("done", data["status"]) + self.assertIsNotNone(data["tx"]["nonce"]) + + @unittest_run_loop + async def test_submit_job_with_parents(self): + """Test that add_parents=True fetches parents from the fullnode before mining.""" + tx_hex = update_timestamp(TX1_DATA).hex() + resp = await self.client.request( + "POST", + "/submit-job", + json={"tx": tx_hex, "add_parents": True}, + ) + self.assertEqual(200, resp.status) + data = await resp.json() + job_id = data["job_id"] + + for _ in range(50): + await asyncio.sleep(0.1) + resp = await self.client.request( + "GET", "/job-status", params={"job-id": job_id} + ) + data = await resp.json() + if data["status"] == "done": + break + + self.assertEqual("done", data["status"]) + self.assertEqual(2, len(data["tx"]["parents"])) + + @unittest_run_loop + async def test_submit_job_with_propagate(self): + """Test that propagate=True pushes the solved tx to the fullnode.""" + tx_hex = update_timestamp(TX1_DATA).hex() + resp = await self.client.request( + "POST", + "/submit-job", + json={"tx": tx_hex, "propagate": True}, + ) + self.assertEqual(200, resp.status) + data = await resp.json() + job_id = data["job_id"] + + for _ in range(50): + await asyncio.sleep(0.1) + resp = await self.client.request( + "GET", "/job-status", params={"job-id": job_id} + ) + data = await resp.json() + if data["status"] == "done": + break + + self.backend.push_tx_or_block.assert_called_once() + + @unittest_run_loop + async def test_health_check(self): + """Health endpoint works unchanged with DevMiningManager.""" + health_check_result = MagicMock() + health_check_result.get_http_status_code.return_value = 200 + health_check_result.to_json.return_value = {"status": "pass"} + + async def side_effect(): + return health_check_result + + self.healthcheck.get_health_check.side_effect = side_effect + + resp = await self.client.request("GET", "/health") + self.assertEqual(200, resp.status) + data = await resp.json() + self.assertEqual({"status": "pass"}, data) + + @unittest_run_loop + async def test_mining_status(self): + """mining-status endpoint exposes the dev_miner flag. + + Clients can use this flag to detect that the service is running in + dev-miner mode (e.g., to adjust timeout expectations). + """ + resp = await self.client.request("GET", "/mining-status") + self.assertEqual(200, resp.status) + data = await resp.json() + self.assertTrue(data["dev_miner"]) + # No stratum miners in dev-miner mode. + self.assertEqual(0, len(data["miners"])) + + @unittest_run_loop + async def test_manager_status(self): + """DevMiningManager.status() and health-related methods.""" + status = self.manager.status() + self.assertTrue(status["dev_miner"]) + # has_any_miner() always returns True — the dev-miner itself is the miner. + self.assertTrue(self.manager.has_any_miner()) + self.assertTrue(self.manager.has_any_submitted_job_in_period(3600)) + + @unittest_run_loop + async def test_duplicate_job_submission(self): + """Submitting the same tx twice returns the existing job (same as production).""" + tx_hex = update_timestamp(TX1_DATA).hex() + + resp1 = await self.client.request("POST", "/submit-job", json={"tx": tx_hex}) + data1 = await resp1.json() + self.assertEqual(200, resp1.status) + + resp2 = await self.client.request("POST", "/submit-job", json={"tx": tx_hex}) + data2 = await resp2.json() + self.assertEqual(200, resp2.status) + + self.assertEqual(data1["job_id"], data2["job_id"]) + + +# --------------------------------------------------------------------------- +# Block mining tests +# --------------------------------------------------------------------------- + +# A serialized block template used for testing solve_block. +BLOCK_DATA = bytes.fromhex( + "000001ffffffe8b789180000001976a9147fd4ae0e4fb2d2854e76d359029d8078bb9" + "9649e88ac40350000000000005e0f84a9000000000000000000000000000000278a7e" +) + + +class TestSolveBlock(AioHTTPTestCase): + """Test the block PoW solver (solve_block).""" + + __test__ = True + + async def get_application(self): + from aiohttp import web + + return web.Application() + + def test_solve_block_from_template(self): + """Verify that solve_block can find a valid nonce for a real block template.""" + from hathorlib import Block + + block = Block.create_from_struct(BLOCK_DATA) + result = solve_block(block) + self.assertTrue(result) + self.assertTrue(block.verify_pow()) + + +class TestBlockMinerInterval(AioHTTPTestCase): + """Test that BlockMiner produces blocks at a steady cadence. + + This is the key property that makes the dev-miner useful for integration + tests: blocks arrive at predictable intervals, so tests can reason about + when reward locks release and confirmations accumulate. + + Two scenarios are tested: + 1. Trivial PoW (--test-mode-block-weight): solve_block is instant, so the + interval is entirely determined by the sleep. + 2. Slow PoW (no --test-mode-block-weight): solve_block takes a significant + fraction of the interval. The timing compensation in BlockMiner._run() + (sleep for interval minus elapsed time) should keep block-to-block time + close to the configured interval. + """ + + __test__ = True + + async def get_application(self): + from aiohttp import web + + return web.Application() + + def _make_miner(self, block_interval_ms, timestamps): + """Create a BlockMiner with a mocked backend that records push timestamps.""" + backend = MagicMock() + template = MagicMock() + template.data = BLOCK_DATA + template.height = 1 + backend.get_block_template = AsyncMock(return_value=template) + + async def record_push(*args, **kwargs): + timestamps.append(_time.monotonic()) + + backend.push_tx_or_block = AsyncMock(side_effect=record_push) + + miner = BlockMiner(backend=backend, block_interval_ms=block_interval_ms) + return miner + + async def _wait_for_blocks(self, timestamps, num_blocks, timeout_s=5.0): + """Poll until enough block timestamps are recorded or timeout.""" + deadline = _time.monotonic() + timeout_s + while len(timestamps) < num_blocks and _time.monotonic() < deadline: + await asyncio.sleep(0.02) + + def _assert_regular_intervals( + self, timestamps, num_blocks, block_interval_ms, tolerance_pct=0.40 + ): + """Assert that block-to-block intervals are within tolerance of the target.""" + self.assertGreaterEqual( + len(timestamps), + num_blocks, + f"Expected at least {num_blocks} blocks but only got {len(timestamps)}", + ) + + intervals_ms = [ + (timestamps[i + 1] - timestamps[i]) * 1000 for i in range(num_blocks - 1) + ] + tolerance_ms = block_interval_ms * tolerance_pct + + for i, interval in enumerate(intervals_ms): + self.assertAlmostEqual( + interval, + block_interval_ms, + delta=tolerance_ms, + msg=( + f"Interval block {i + 1} -> {i + 2}: {interval:.0f}ms " + f"(expected ~{block_interval_ms}ms +/-{tolerance_ms:.0f}ms)" + ), + ) + + @unittest_run_loop + async def test_blocks_mined_at_regular_intervals_trivial_weight(self): + """With trivial block weight (--test-mode-block-weight), solve_block + returns almost instantly, so the interval is purely sleep-driven. + """ + block_interval_ms = 200 + num_blocks = 5 + timestamps = [] + + miner = self._make_miner(block_interval_ms, timestamps) + + # Mock solve_block to return immediately (simulates weight ~1). + with patch("txstratum.dev.block_miner.solve_block", return_value=True): + await miner.start() + await self._wait_for_blocks(timestamps, num_blocks) + await miner.stop() + + self._assert_regular_intervals(timestamps, num_blocks, block_interval_ms) + + @unittest_run_loop + async def test_blocks_mined_at_regular_intervals_slow_mining(self): + """Even when solve_block takes 60-90% of the interval (simulating + environments without --test-mode-block-weight), the timing compensation + in _run() keeps block-to-block time close to the configured interval. + + This proves the `remaining = interval - elapsed` logic works correctly. + """ + block_interval_ms = 200 + num_blocks = 5 + timestamps = [] + call_count = 0 + + # Simulate mining that takes 60-90% of the interval. + mining_delays_s = [0.15, 0.18, 0.12, 0.16, 0.14, 0.17] + + def slow_solve_block(block): + nonlocal call_count + delay = mining_delays_s[min(call_count, len(mining_delays_s) - 1)] + call_count += 1 + _time.sleep(delay) + return True + + miner = self._make_miner(block_interval_ms, timestamps) + + with patch( + "txstratum.dev.block_miner.solve_block", side_effect=slow_solve_block + ): + await miner.start() + await self._wait_for_blocks(timestamps, num_blocks) + await miner.stop() + + self._assert_regular_intervals(timestamps, num_blocks, block_interval_ms) diff --git a/txstratum/api.py b/txstratum/api.py index 451b539..d2245a3 100644 --- a/txstratum/api.py +++ b/txstratum/api.py @@ -21,7 +21,7 @@ if TYPE_CHECKING: from txstratum.filters import TXFilter - from txstratum.manager import TxMiningManager + from txstratum.protocols import MiningManager # Default maximum txout script size (in bytes). @@ -45,7 +45,7 @@ class App: def __init__( self, - manager: "TxMiningManager", + manager: "MiningManager", health_check: "HealthCheck", *, max_tx_weight: Optional[float] = None, diff --git a/txstratum/cli.py b/txstratum/cli.py index 4c81f27..3f5a565 100644 --- a/txstratum/cli.py +++ b/txstratum/cli.py @@ -97,6 +97,22 @@ def create_parser() -> ArgumentParser: type=int, default=None, ) + # --- Dev-miner mode arguments --- + # These enable an alternative execution path (RunDevService instead of + # RunService) that replaces the stratum server + cpuminer with in-process + # mining. Intended for integration test environments where the full + # production mining stack is unnecessary overhead. + parser.add_argument( + "--dev-miner", + action="store_true", + help="Enable dev-miner mode: mine blocks and transactions in-process without stratum", + ) + parser.add_argument( + "--block-interval", + help="Block mining interval in milliseconds (dev-miner mode only)", + type=int, + default=1000, + ) parser.add_argument( "backend", help="Endpoint of the Hathor API (without version)", type=str ) @@ -166,7 +182,8 @@ def __init__(self, args: Namespace) -> None: ) self.health_check: HealthCheck = HealthCheck(self.manager, self.backend) - def configure_logging(self, args: Namespace) -> None: + @staticmethod + def configure_logging(args: Namespace) -> None: """Configure logging.""" from txstratum.utils import start_logging @@ -352,6 +369,145 @@ async def _shutdown(self) -> None: self.loop.stop() +class RunDevService: + """Alternative service runner for dev/test environments. + + This is the counterpart to RunService (the production runner). When the + user passes --dev-miner on the CLI, main() instantiates this class instead + of RunService. + + Key differences from RunService: + - No stratum server — no StratumProtocol, no PubSubManager, no create_server + - No tx_filters, no prometheus — features that don't apply to test environments + - Uses DevMiningManager (in-process tx mining) instead of TxMiningManager + - Adds a BlockMiner (in-process block production) — in production, cpuminer + fills this role by connecting to the stratum server + + The HTTP API (txstratum/api.py) is started identically to RunService, + because DevMiningManager implements the same interface as TxMiningManager. + From the API's perspective, nothing changes. + """ + + def __init__(self, args: Namespace) -> None: + """Initialize the dev-miner service.""" + from hathorlib.client import HathorClient + from hathorlib.conf import HathorSettings + + from txstratum.dev.block_miner import BlockMiner + from txstratum.dev.manager import DevMiningManager + from txstratum.healthcheck.healthcheck import HealthCheck + + self.settings = HathorSettings() + self.args = args + + RunService.configure_logging(args) + + self.loop: AbstractEventLoop = asyncio.get_event_loop() + + self.backend: HathorClient = HathorClient(args.backend) + + # DevMiningManager replaces TxMiningManager — handles tx PoW in-process. + self.manager: DevMiningManager = DevMiningManager(backend=self.backend) + + # BlockMiner replaces cpuminer — produces blocks at a steady interval. + self.block_miner: BlockMiner = BlockMiner( + backend=self.backend, + address=args.address, + block_interval_ms=args.block_interval, + ) + + # HealthCheck accepts the manager interface, works with both managers. + self.health_check: HealthCheck = HealthCheck(self.manager, self.backend) + + def execute(self) -> None: + """Run the dev-miner service. + + Startup order matters: backend must connect before the manager starts + accepting jobs, and the manager must be running before the BlockMiner + starts producing blocks (otherwise submitted blocks could fail). + """ + from txstratum.api import App + + self.loop.run_until_complete(self.backend.start()) + self.loop.run_until_complete(self.manager.start()) + self.loop.run_until_complete(self.block_miner.start()) + + # The App constructor accepts any object with the TxMiningManager + # interface. DevMiningManager satisfies this — no API code changes. + # Note: tx_filters (ban lists, TOI) are omitted because they don't + # apply to test environments. They could be wired up here for + # compatibility, but there's no current use case for filtering in + # dev-miner mode. + api_app = App( + self.manager, + self.health_check, + max_tx_weight=self.args.max_tx_weight, + max_timestamp_delta=self.args.max_timestamp_delta, + tx_timeout=self.args.tx_timeout, + fix_invalid_timestamp=self.args.fix_invalid_timestamp, + only_standard_script=not self.args.allow_non_standard_script, + ) + + logger.info( + "Dev-miner mode enabled", + backend=self.args.backend, + address=self.args.address, + block_interval_ms=self.args.block_interval, + api_port=self.args.api_port, + network=self.settings.NETWORK_NAME, + ) + + web_runner = web.AppRunner(api_app.app, logger=logger) + self.loop.run_until_complete(web_runner.setup()) + site = web.TCPSite(web_runner, "0.0.0.0", self.args.api_port) + self.loop.run_until_complete(site.start()) + + self.register_signal_handlers() + + logger.info("TxMining API running at 0.0.0.0:{}...".format(self.args.api_port)) + self.loop.run_forever() + + # Signal handling methods are intentionally duplicated from RunService to + # avoid modifying production code in this first iteration. A future + # improvement could extract them into a shared base class or mixin. + + def handle_shutdown_signal(self, signal: str) -> None: + """Handle shutdown signals.""" + logger.info(f"{signal} received.") + self.loop.create_task(self._shutdown()) + + def register_signal_handlers(self) -> None: + """Register signal handlers.""" + import signal + + logger.info("Registering signal handlers...") + + sigterm = getattr(signal, "SIGTERM", None) + if sigterm is not None: + self.loop.add_signal_handler( + sigterm, lambda: self.handle_shutdown_signal("SIGTERM") + ) + + sigint = getattr(signal, "SIGINT", None) + if sigint is not None: + self.loop.add_signal_handler( + sigint, lambda: self.handle_shutdown_signal("SIGINT") + ) + + async def _shutdown(self) -> None: + """Shutdown the dev-miner service. + + Simpler than RunService._shutdown — no graceful drain needed because + tx mining is near-instant (no stratum round-trip to wait for). + """ + logger.info("Shutting down dev-miner...") + self.manager.shutdown() + await self.block_miner.stop() + await self.manager.stop() + await self.backend.stop() + self.loop.stop() + + def main() -> None: """Run the service using the cmdline.""" parser = create_parser() @@ -361,4 +517,10 @@ def main() -> None: if not os.environ.get("TXMINING_CONFIG_FILE"): os.environ["TXMINING_CONFIG_FILE"] = "hathorlib.conf.testnet" - RunService(args).execute() + # Route to the appropriate service runner based on --dev-miner flag. + # Both runners expose the same HTTP API; they differ only in how they + # mine transactions and blocks (stratum vs. in-process). + if args.dev_miner: + RunDevService(args).execute() + else: + RunService(args).execute() diff --git a/txstratum/dev/__init__.py b/txstratum/dev/__init__.py new file mode 100644 index 0000000..9ae2155 --- /dev/null +++ b/txstratum/dev/__init__.py @@ -0,0 +1,32 @@ +# Copyright (c) Hathor Labs and its affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +# Dev-miner package: lightweight mining for dev/test environments. +# +# This package replaces the production mining stack (stratum server + cpuminer) +# with in-process mining. The motivation is to simplify the integration test +# infrastructure for hathor-wallet-lib, which previously required four Docker +# containers (fullnode + tx-mining-service + cpuminer + test runner) and suffered +# from non-deterministic block timing and wasted CPU on CI. +# +# Architecture (see also the RFC "Simplified Mining for Integration Tests"): +# +# tx_miner.py - Solves transaction PoW by brute-forcing nonces. With +# --test-mode-tx-weight on the fullnode, weight is ~1, so +# the first nonce almost always works. +# +# block_miner.py - Background async loop that produces blocks at a steady +# cadence (configurable via --block-interval). Replaces +# cpuminer entirely. +# +# manager.py - Drop-in replacement for TxMiningManager. Implements the +# same interface consumed by the HTTP API layer, so the API +# code doesn't need any changes. + +# Full 32-bit nonce space, shared by both tx_miner and block_miner. +# Even at standard weight (~22), the expected number of iterations is ~2^22 +# (~4M), well within the 2^32 (~4B) range. With --test-mode-tx-weight or +# --test-mode-block-weight (weight ~1), ~50% of nonces are valid. +MAX_NONCE = 2**32 diff --git a/txstratum/dev/block_miner.py b/txstratum/dev/block_miner.py new file mode 100644 index 0000000..b88bae1 --- /dev/null +++ b/txstratum/dev/block_miner.py @@ -0,0 +1,196 @@ +# Copyright (c) Hathor Labs and its affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +"""Background block miner for dev/test environments. + +In production, blocks are mined by cpuminer connecting to the stratum server: + + fullnode ──stratum──▶ tx-mining-service ◀──stratum──── cpuminer + +cpuminer has no concept of "block interval" — it mines as fast as it can. This +produces irregular block spacing, which causes test flakiness (e.g. reward locks +releasing at unpredictable times, confirmations accumulating at uneven rates). + +This module replaces cpuminer with an in-process loop that: + 1. Polls the fullnode for block templates (via get_block_template) + 2. Solves the PoW by brute-forcing nonces (offloaded to a thread executor) + 3. Submits the solved block back to the fullnode (via push_tx_or_block) + 4. Sleeps for the *remaining* interval time to maintain a steady cadence + +Step 4 is key: if solving takes T seconds and the configured interval is I +seconds, the loop sleeps max(0, I - T). This keeps block-to-block time close +to the configured interval even when PoW isn't trivial. + +Combined with --test-mode-block-weight on the fullnode (which reduces block +weight to 1, bypassing the full DAA computation), solving typically succeeds +on the first nonce — so blocks are produced at an almost exactly regular pace. +""" + +import asyncio +from typing import TYPE_CHECKING, Optional + +from aiohttp import ClientError +from hathorlib import Block +from hathorlib.exceptions import PushTxFailed +from structlog import get_logger + +import txstratum.time +from txstratum.dev import MAX_NONCE + +if TYPE_CHECKING: + from hathorlib.client import HathorClient + +logger = get_logger() + + +def solve_block(block: Block) -> bool: + """Solve PoW for a block by iterating nonces. + + This is a blocking function — called via run_in_executor in the mining loop + to avoid blocking the async event loop. + + Returns True if a valid nonce was found, False otherwise. + """ + for nonce in range(MAX_NONCE): + block.nonce = nonce + block.update_hash() + if block.verify_pow(): + return True + return False + + +class BlockMiner: + """Background block miner that replaces cpuminer + stratum for dev/test. + + Key differences from the production mining path: + - No stratum protocol — communicates directly with the fullnode via HTTP + - Configurable block interval — produces blocks at a steady cadence + - Single-threaded — no pool of miners, just one solve loop + """ + + def __init__( + self, + backend: "HathorClient", + address: Optional[str] = None, + block_interval_ms: int = 1000, + ): + """Initialize the block miner.""" + if block_interval_ms <= 0: + raise ValueError( + f"block_interval_ms must be positive, got {block_interval_ms}" + ) + self.log = logger.new() + self.backend = backend + self.address = address + self.block_interval_s = block_interval_ms / 1000.0 + self.blocks_found = 0 + self._running = False + self._task: Optional[asyncio.Task[None]] = None + + async def start(self) -> None: + """Start the block mining loop.""" + self._running = True + self._task = asyncio.ensure_future(self._run()) + self.log.info( + "Block miner started", + address=self.address, + block_interval_ms=int(self.block_interval_s * 1000), + ) + + async def stop(self) -> None: + """Stop the block mining loop.""" + self._running = False + if self._task is not None: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self.log.info("Block miner stopped", blocks_found=self.blocks_found) + + async def _wait_for_fullnode(self) -> None: + """Wait until the fullnode is ready to serve block templates. + + We use get_block_template (not the /health endpoint) because the + fullnode may report healthy before it's actually ready to produce + block templates. This was a source of startup race conditions. + """ + while self._running: + try: + await self.backend.get_block_template(address=self.address) + self.log.info("Fullnode is ready") + return + except (ClientError, OSError, RuntimeError): + self.log.info("Waiting for fullnode to become available...") + await asyncio.sleep(1) + + async def _run(self) -> None: + """Run the main mining loop. + + The timing compensation logic (sleep for interval minus elapsed time) + ensures that blocks are produced at a steady cadence regardless of how + long the PoW solve takes. See tests/test_dev_miner.py for validation + of this behavior with both trivial and slow mining. + """ + await self._wait_for_fullnode() + + while self._running: + cycle_start = txstratum.time.time() + + try: + await self._mine_one_block() + except asyncio.CancelledError: + return + except Exception: + self.log.exception("Error in block mining loop") + await asyncio.sleep(1) + continue + + # Timing compensation: sleep only the remaining time after solving, + # so block-to-block interval stays close to the configured value. + elapsed = txstratum.time.time() - cycle_start + remaining = self.block_interval_s - elapsed + if remaining > 0: + await asyncio.sleep(remaining) + + async def _mine_one_block(self) -> None: + """Fetch a block template, solve it, and submit it. + + The PoW solve is offloaded to a thread executor because even though + it's trivial with --test-mode-block-weight, it's still CPU-bound work + that would block the event loop (and delay tx mining). + """ + start = txstratum.time.time() + + template = await self.backend.get_block_template(address=self.address) + block = Block.create_from_struct(template.data) + + loop = asyncio.get_event_loop() + solved = await loop.run_in_executor(None, solve_block, block) + if not solved: + self.log.error("Failed to solve block", height=template.height) + return + + elapsed_ms = (txstratum.time.time() - start) * 1000 + + try: + await self.backend.push_tx_or_block(bytes(block)) + except PushTxFailed: + self.log.exception( + "Block rejected by fullnode", + height=template.height, + hash=block.hash_hex, + ) + return + + self.blocks_found += 1 + self.log.info( + "Block mined", + height=template.height, + hash=block.hash_hex, + weight=block.weight, + nonce=block.nonce, + time_ms=f"{elapsed_ms:.1f}", + ) diff --git a/txstratum/dev/manager.py b/txstratum/dev/manager.py new file mode 100644 index 0000000..51e4aa2 --- /dev/null +++ b/txstratum/dev/manager.py @@ -0,0 +1,277 @@ +# Copyright (c) Hathor Labs and its affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +"""Simplified mining manager for dev/test environments. + +In production, TxMiningManager (txstratum/manager.py) acts as a stratum server: +it accepts connections from external miners, dispatches transaction and block +jobs to them, and collects solutions. This requires: + - A running cpuminer (or any stratum-compatible miner) + - The stratum protocol to coordinate job dispatch and nonce submission + - A PubSubManager for internal event routing + - Block template polling and miner assignment logic + +DevMiningManager replaces all of that with in-process mining. Transactions are +solved directly (via tx_miner.solve_tx in a thread executor), and the result is +returned through the same job lifecycle that the HTTP API expects. + +IMPORTANT: This class implements the same interface consumed by txstratum/api.py +(the HTTP API layer). The API code doesn't know or care whether it's talking to +TxMiningManager or DevMiningManager — it calls add_job(), get_job(), status(), +etc. This is what makes the dev-miner a drop-in replacement: the only change +needed is in cli.py to instantiate DevMiningManager instead of TxMiningManager. +""" + +import asyncio +from typing import TYPE_CHECKING, Any, Dict, List, Optional + +from structlog import get_logger + +import txstratum.time +from txstratum.dev.tx_miner import solve_tx +from txstratum.exceptions import JobAlreadyExists, NewJobRefused +from txstratum.jobs import JobStatus, TxJob + +if TYPE_CHECKING: + from hathorlib.client import HathorClient + +logger = get_logger() + + +class DevMiningManager: + """Drop-in replacement for TxMiningManager that mines transactions in-process. + + Responsibilities split between this class and BlockMiner: + - DevMiningManager: handles transaction PoW (via solve_tx) + - BlockMiner: handles block production (separate async loop in block_miner.py) + + The production TxMiningManager handles both in a single class because the + stratum protocol intermixes block and tx jobs on the same miner connections. + Here we separate them because there's no stratum — tx mining is request-driven + (triggered by HTTP submit-job) while block mining is time-driven (periodic loop). + """ + + TX_CLEAN_UP_INTERVAL = 300.0 # seconds + + def __init__(self, backend: "HathorClient"): + """Initialize the dev mining manager.""" + self.log = logger.new() + self.backend = backend + self.started_at: float = 0 + self.tx_jobs: Dict[bytes, TxJob] = {} + self._tasks: Dict[bytes, asyncio.Task[None]] = {} + self.refuse_new_jobs = False + + # Statistics — same fields as TxMiningManager for API compatibility. + self.txs_solved: int = 0 + self.txs_timeout: int = 0 + self.txs_failed: int = 0 + + async def start(self) -> None: + """Start the manager.""" + self.started_at = txstratum.time.time() + self.log.info("DevMiningManager started") + + async def stop(self) -> None: + """Stop the manager.""" + for task in self._tasks.values(): + task.cancel() + await asyncio.gather(*self._tasks.values(), return_exceptions=True) + self._tasks.clear() + self.log.info("DevMiningManager stopped") + + @property + def uptime(self) -> float: + """Return the manager uptime in seconds.""" + if not self.started_at: + return 0.0 + return txstratum.time.time() - self.started_at + + def get_job(self, uuid: bytes) -> Optional[TxJob]: + """Return the TxJob related to the uuid.""" + return self.tx_jobs.get(uuid) + + def status(self) -> Dict[Any, Any]: + """Return status dict compatible with the production TxMiningManager. + + All stratum-specific fields (miners, hashrate, block_template) are + hardcoded to empty/zero values since they don't apply in dev-miner + mode. The `dev_miner: True` flag lets clients detect the mode. + """ + return { + "miners": [], + "miners_count": 0, + "total_hashrate_ghs": 0, + "started_at": self.started_at, + "txs_solved": self.txs_solved, + "txs_timeout": self.txs_timeout, + # Block mining is handled by BlockMiner (separate component); + # this manager only tracks transaction mining statistics. + "blocks_found": 0, + "uptime": self.uptime, + "tx_queue": 0, + "tx_jobs": [job.to_dict() for job in self.tx_jobs.values()], + "block_template_error": 0, + "block_template": None, + "dev_miner": True, + } + + def add_job(self, job: TxJob) -> bool: + """Add a new transaction to be mined. + + Unlike TxMiningManager (which queues the job and waits for an available + stratum miner), this kicks off an async task that solves the PoW directly. + The job transitions through the same states (MINING → DONE/FAILED) that + the API polling endpoint expects. + """ + if self.refuse_new_jobs: + raise NewJobRefused + + # Handle duplicate submissions the same way as the production manager: + # allow resubmission if the previous attempt timed out, reject otherwise. + if job.uuid in self.tx_jobs: + prev_job = self.tx_jobs[job.uuid] + if prev_job.status == JobStatus.TIMEOUT: + self.tx_jobs.pop(job.uuid) + else: + raise JobAlreadyExists + + self.tx_jobs[job.uuid] = job + + # Expected times are 0 because there's no queue and no stratum latency. + job.expected_mining_time = 0 + job.expected_queue_time = 0 + + self.log.info("New TxJob (dev-miner)", job_id=job.uuid.hex()) + + # Kick off mining as an async task. If the job needs parents (i.e. the + # caller sent add_parents=True), we fetch them from the fullnode first. + if job.add_parents: + task = asyncio.create_task(self._mine_with_parents(job)) + else: + task = asyncio.create_task(self._mine_job(job)) + self._tasks[job.uuid] = task + task.add_done_callback(lambda _: self._tasks.pop(job.uuid, None)) + + return True + + async def _mine_with_parents(self, job: TxJob) -> None: + """Fetch parent transactions from the fullnode, then mine. + + In Hathor, every transaction must reference parent transactions (similar + to a DAG tip selection). The fullnode provides suitable parents via the + get_tx_parents API. In production, TxMiningManager does this inline + before dispatching to a stratum miner. + """ + job.status = JobStatus.GETTING_PARENTS + try: + parents: List[bytes] = await self.backend.get_tx_parents() + except Exception as e: + job.mark_as_failed(f"Failed to get parents ({type(e).__name__}): {e}") + self.txs_failed += 1 + self._schedule_cleanup(job) + return + + job.set_parents(parents) + await self._mine_job(job) + + async def _mine_job(self, job: TxJob) -> None: + """Solve PoW for a transaction job. + + The actual nonce iteration runs in a thread executor (via run_in_executor) + to avoid blocking the async event loop. This is important because the + BlockMiner loop and HTTP server share the same event loop — a blocking + solve_tx call would stall block production and API responses. + """ + job.status = JobStatus.MINING + job.started_at = txstratum.time.time() + start = txstratum.time.time() + + tx = job.get_tx() + # Update timestamp to current time — the fullnode validates that tx + # timestamps are within an acceptable delta of the current time. + tx.timestamp = int(txstratum.time.time()) + + loop = asyncio.get_event_loop() + solved = await loop.run_in_executor(None, solve_tx, tx) + if not solved: + job.mark_as_failed("Failed to solve PoW") + self.txs_failed += 1 + self._schedule_cleanup(job) + return + + elapsed_ms = (txstratum.time.time() - start) * 1000 + nonce = tx.get_struct_nonce() + + # If propagate=True, push the solved tx to the fullnode. This mirrors + # the production behavior where the stratum miner's solution is + # automatically pushed to the network. + if job.propagate: + try: + await self.backend.push_tx_or_block(bytes(tx)) + except Exception as e: + job.mark_as_failed( + f"Fullnode rejected transaction ({type(e).__name__}): {e}" + ) + self.txs_failed += 1 + self._schedule_cleanup(job) + return + + job.mark_as_solved(nonce=nonce, timestamp=tx.timestamp) + self.txs_solved += 1 + + self.log.info( + "TxJob solved (dev-miner)", + job_id=job.uuid.hex(), + hash=tx.hash_hex, + weight=tx.weight, + nonce=tx.nonce, + time_ms=f"{elapsed_ms:.1f}", + ) + self._schedule_cleanup(job) + + def cancel_job(self, job: TxJob) -> None: + """Cancel tx mining job.""" + if job.status in JobStatus.get_after_mining_states(): + raise ValueError("Job has already finished") + task = self._tasks.pop(job.uuid, None) + if task is not None: + task.cancel() + job.status = JobStatus.CANCELLED + self.tx_jobs.pop(job.uuid) + self.log.info("TxJob cancelled", job_id=job.uuid.hex()) + + def _schedule_cleanup(self, job: TxJob) -> None: + """Schedule removal of a completed job after TX_CLEAN_UP_INTERVAL. + + Jobs are kept around after completion so the API polling endpoint + can still return their status. Same behavior as production. + """ + loop = asyncio.get_event_loop() + loop.call_later(self.TX_CLEAN_UP_INTERVAL, self._cleanup_job, job) + + def _cleanup_job(self, job: TxJob) -> None: + """Remove job from tracking.""" + self.tx_jobs.pop(job.uuid, None) + + def shutdown(self) -> None: + """Shutdown the manager (stop accepting new jobs).""" + self.refuse_new_jobs = True + + def has_any_miner(self) -> bool: + """Return True; the dev-miner itself is the miner. + + The health check uses this to determine if the service is operational. + In production, this checks for connected stratum miners. + """ + return True + + def has_any_submitted_job_in_period(self, period: int) -> bool: + """Return True to indicate the dev-miner is always active. + + In production, this checks recent job activity to detect stale services. + The dev-miner is always "active" by definition. + """ + return True diff --git a/txstratum/dev/tx_miner.py b/txstratum/dev/tx_miner.py new file mode 100644 index 0000000..20f1563 --- /dev/null +++ b/txstratum/dev/tx_miner.py @@ -0,0 +1,43 @@ +# Copyright (c) Hathor Labs and its affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +"""Synchronous transaction PoW solver for dev/test environments. + +In production, transaction PoW is solved by external miners connected via the +stratum protocol. The round-trip (service sends job → miner solves → miner +returns nonce) adds latency and complexity that is unnecessary in a single-node +test setup. + +This module replaces that path with a simple brute-force nonce search that runs +in-process. Combined with --test-mode-tx-weight on the fullnode (which sets +transaction weight to ~1), the first nonce almost always passes — making PoW +resolution effectively instant. + +The function is designed to be called via `run_in_executor` so it doesn't block +the async event loop (see manager.py). +""" + +from hathorlib.base_transaction import BaseTransaction +from structlog import get_logger + +from txstratum.dev import MAX_NONCE + +logger = get_logger() + + +def solve_tx(tx: BaseTransaction) -> bool: + """Solve PoW for a transaction by iterating nonces. + + This is a blocking function — it should be run in a thread executor + to avoid blocking the async event loop. + + Returns True if a valid nonce was found. + """ + for nonce in range(MAX_NONCE): + tx.nonce = nonce + tx.update_hash() + if tx.verify_pow(): + return True + return False diff --git a/txstratum/healthcheck/healthcheck.py b/txstratum/healthcheck/healthcheck.py index 5c8e297..0ce0d8e 100644 --- a/txstratum/healthcheck/healthcheck.py +++ b/txstratum/healthcheck/healthcheck.py @@ -13,7 +13,7 @@ ) if TYPE_CHECKING: - from txstratum.manager import TxMiningManager + from txstratum.protocols import MiningManager class HealthCheck: @@ -22,7 +22,7 @@ class HealthCheck: It will aggregate the responses into a final object to be returned following our standards. """ - def __init__(self, manager: "TxMiningManager", backend: "HathorClient") -> None: + def __init__(self, manager: "MiningManager", backend: "HathorClient") -> None: """Init the class with the components that will be checked.""" self.healthcheck = Healthcheck("TxMiningService") @@ -123,7 +123,7 @@ class MiningHealthCheck(ComponentHealthCheckInterface): JOB_MINING_TIME_THRESHOLD = 10 # 10 seconds COMPONENT_NAME = "manager" - def __init__(self, manager: "TxMiningManager") -> None: + def __init__(self, manager: "MiningManager") -> None: """Init the class with the manager instance.""" self.manager = manager self.last_response = HealthcheckCallbackResponse( diff --git a/txstratum/protocols.py b/txstratum/protocols.py new file mode 100644 index 0000000..97b20d9 --- /dev/null +++ b/txstratum/protocols.py @@ -0,0 +1,44 @@ +# Copyright (c) Hathor Labs and its affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. +"""Structural typing protocols shared across modules.""" + +from typing import Any, Dict, Optional, Protocol + +from txstratum.jobs import TxJob + + +class MiningManager(Protocol): + """Structural interface shared by TxMiningManager and DevMiningManager. + + Both managers implement this interface, which is consumed by App (HTTP API) + and HealthCheck. Using a Protocol instead of a concrete base class allows + the two managers to remain independent (no shared inheritance). + """ + + tx_jobs: Dict[bytes, TxJob] + + def status(self) -> Dict[Any, Any]: + """Return status dict.""" + ... + + def add_job(self, job: TxJob) -> bool: + """Add a new transaction to be mined.""" + ... + + def get_job(self, uuid: bytes) -> Optional[TxJob]: + """Return the TxJob for the given uuid.""" + ... + + def cancel_job(self, job: TxJob) -> None: + """Cancel a mining job.""" + ... + + def has_any_miner(self) -> bool: + """Return whether any miner is active.""" + ... + + def has_any_submitted_job_in_period(self, period: int) -> bool: + """Return whether any job was submitted in the given period.""" + ...