diff --git a/.github/workflows/hive-consume.yaml b/.github/workflows/hive-consume.yaml index cb040765f9..1fc8e9842c 100644 --- a/.github/workflows/hive-consume.yaml +++ b/.github/workflows/hive-consume.yaml @@ -34,15 +34,28 @@ jobs: - name: consume-engine mode: simulator simulator: ethereum/eels/consume-engine + sim_limit: ".*test_block_at_rlp_limit_with_logs.*Osaka.*" + # TODO: Enable once eels/consume-enginex simulator is added to Hive + # - name: consume-enginex + # mode: simulator + # simulator: ethereum/eels/consume-enginex + # sim_limit: ".*push0.*(Shanghai|Cancun).*" - name: consume-rlp mode: simulator simulator: ethereum/eels/consume-rlp + sim_limit: ".*test_block_at_rlp_limit_with_logs.*Osaka.*" - name: consume-sync mode: simulator simulator: ethereum/eels/consume-sync + sim_limit: ".*test_block_at_rlp_limit_with_logs.*Osaka.*" - name: dev-mode mode: dev consume_command: engine + test_filter: "Osaka and test_block_at_rlp_limit_with_logs" + - name: dev-mode-enginex + mode: dev + consume_command: enginex + test_filter: "(fork_shanghai or fork_cancun) and push0" steps: - name: Checkout execution-specs uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 @@ -85,7 +98,7 @@ jobs: --client go-ethereum \ --client-file ../execution-specs/.github/configs/hive/latest.yaml \ --sim.buildarg fixtures=${{ env.FIXTURES_URL }} \ - --sim.limit=".*test_block_at_rlp_limit_with_logs.*Osaka.*" \ + --sim.limit="${{ matrix.sim_limit }}" \ --docker.output - name: Start Hive in dev mode @@ -105,4 +118,4 @@ jobs: HIVE_SIMULATOR: ${{ steps.start-hive.outputs.hive-url }} run: | uv sync --all-extras - uv run consume ${{ matrix.consume_command }} --input ${{ env.FIXTURES_URL }} -k "Osaka and test_block_at_rlp_limit_with_logs" + uv run consume ${{ matrix.consume_command }} --input ${{ env.FIXTURES_URL }} -k "${{ matrix.test_filter }}" diff --git a/docs/running_tests/running.md b/docs/running_tests/running.md index efd98c340d..a09714d68c 100644 --- a/docs/running_tests/running.md +++ b/docs/running_tests/running.md @@ -14,6 +14,7 @@ Both `consume` and `execute` provide sub-commands which correspond to different | [`consume direct`](#direct) | Client consume tests via a `statetest` interface | EVM | None | Module test | | [`consume direct`](#direct) | Client consume tests via a `blocktest` interface | EVM, block processing | None | Module test,
Integration test | | [`consume engine`](#engine) | Client imports blocks via Engine API `EngineNewPayload` in Hive | EVM, block processing, Engine API | Staging, Hive | System test | +| [`consume enginex`](#enginex) | Client imports blocks via Engine API in Hive with, optimized by client reuse | EVM, block processing, Engine API | Staging, Hive | System test | | [`consume sync`](#sync) | Client syncs from another client using Engine API in Hive | EVM, block processing, Engine API, P2P sync | Staging, Hive | System test | | [`consume rlp`](#rlp) | Client imports RLP-encoded blocks upon start-up in Hive | EVM, block processing, RLP import (sync\*) | Staging, Hive | System test | | [`execute hive`](./execute/hive.md) | Tests executed against a client via JSON RPC `eth_sendRawTransaction` in Hive | EVM, JSON RPC, mempool | Staging, Hive | System test | @@ -62,6 +63,48 @@ The `consume engine` command: 5. **Validates responses** against expected results. 6. **Tests error conditions** and exception handling. +## EngineX + +| Nomenclature | | +| -------------- | -------------------------- | +| Command | `consume enginex` | +| Simulator | `eels/consume-enginex` | +| Fixture format | `blockchain_test_engine_x` | + +The EngineX method is a faster alternative to `consume engine` that executes multiple tests against a single client instance. This is achieved via the [Blockchain Engine X Test fixture format](./test_formats/blockchain_test_engine_x.md) which groups tests that share the same fork and EVM [Environment](./test_formats/state_test.md#fixtureenvironment) together and contains a larger, shared pre-allocation state that all tests in the group use. This allows the EngineX simulator to execute multiple tests against the same client instance, whereas the Engine Simulator starts a fresh client for each test. + +The `consume enginex` command, for each pre-allocation group: + +1. **Initializes the execution client** with the group's shared genesis state. +2. **Connects via Engine API** (port 8551). +3. **Executes all tests in the group** against the same client: + + - Submits payloads from each test using `engine_newPayload` calls. + - Validates responses against expected results. + - Tests error conditions and exception handling. + +4. **Stops the client** when all tests in the group complete. + +### Engine vs EngineX + +| | `consume engine` | `consume enginex` | +| -------------------- | ---------------------------------------------------------------------- | ------------------------------------------------------------------------ | +| **Fixture format** | [`blockchain_test_engine`](./test_formats/blockchain_test_engine_x.md) | [`blockchain_test_engine_x`](./test_formats/blockchain_test_engine_x.md) | +| **Client lifecycle** | New client per test | Client reused across tests with same pre-alloc | +| **Fork choice update** | FCU called for genesis and final payload | FCU for genesis and final payload skipped | +| **Execution speed** | Slower (client startup overhead) | Faster (amortized startup cost) | +| **Test isolation** | Full isolation | Shared genesis state within group | + +EngineX achieves faster execution by: + +1. **Grouping tests** by their pre-allocation state (genesis configuration). +2. **Reusing clients** across all tests in a group, avoiding repeated client startup. +3. **Skipping redundant initialization** since the client is already at the expected genesis state. + +!!! note "When to use EngineX vs Engine" + + Use `consume enginex` for faster test runs when full per-test isolation is not required. Use `consume engine` when you need complete isolation between tests or when debugging issues triggered by a single test case. + ## RLP | Nomenclature | | diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/consume.py b/packages/testing/src/execution_testing/cli/pytest_commands/consume.py index 44364b8940..9667288bdd 100644 --- a/packages/testing/src/execution_testing/cli/pytest_commands/consume.py +++ b/packages/testing/src/execution_testing/cli/pytest_commands/consume.py @@ -43,12 +43,13 @@ def create_consume_command( def get_command_logic_test_paths(command_name: str) -> List[Path]: """Determine the command paths based on the command name and hive flag.""" base_path = Path("cli/pytest_commands/plugins/consume") - if command_name in ["engine", "rlp"]: + if command_name in ["engine", "enginex", "rlp"]: + test_command = "engine" if command_name == "enginex" else command_name command_logic_test_paths = [ base_path / "simulators" / "simulator_logic" - / f"test_via_{command_name}.py" + / f"test_via_{test_command}.py" ] elif command_name == "sync": command_logic_test_paths = [ @@ -119,6 +120,12 @@ def engine() -> None: pass +@consume_command(is_hive=True) +def enginex() -> None: + """Client consumes via the Engine API using Engine X fixtures with pre-alloc optimization.""" + pass + + @consume_command(is_hive=True) def sync() -> None: """Client consumes via the Engine API with sync testing.""" diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/consume.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/consume.py index 0f069b61df..684bad203b 100644 --- a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/consume.py +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/consume.py @@ -19,7 +19,11 @@ from execution_testing.cli.gen_index import ( generate_fixtures_index, ) -from execution_testing.fixtures import BaseFixture, FixtureFormat +from execution_testing.fixtures import ( + BaseFixture, + BlockchainEngineXFixture, + FixtureFormat, +) from execution_testing.fixtures.consume import IndexFile, TestCases from execution_testing.forks import ( get_forks, @@ -588,11 +592,24 @@ def pytest_generate_tests(metafunc: pytest.Metafunc) -> None: fork_markers = get_relative_fork_markers( test_case.fork, strict_mode=False ) + + # Build base marks (fork and format) + marks = [getattr(pytest.mark, m) for m in fork_markers] + [ + getattr(pytest.mark, test_case.format.format_name) + ] + + # Add xdist_group marker for engine x tests to enable client reuse tracking + if test_case.format is BlockchainEngineXFixture: + assert hasattr(test_case, "pre_hash") and test_case.pre_hash, ( + f"BlockchainEngineXFixture test case '{test_case.id}' missing pre_hash" + ) + group_identifier = test_case.pre_hash + marks.append(pytest.mark.xdist_group(name=group_identifier)) + param = pytest.param( test_case, id=test_case.id, - marks=[getattr(pytest.mark, m) for m in fork_markers] - + [getattr(pytest.mark, test_case.format.format_name)], + marks=marks, ) param_list.append(param) diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/__init__.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/__init__.py index 858f14ab07..d2cafc0261 100644 --- a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/__init__.py +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/__init__.py @@ -1 +1 @@ -"""Consume hive simulators test functions.""" +"""Consume Hive simulators test functions.""" diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/base.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/base.py index 3f9aa3c18d..5ed8827446 100644 --- a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/base.py +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/base.py @@ -30,7 +30,11 @@ def check_live_port(test_suite_name: str) -> Literal[8545, 8551]: """Port used by hive to check for liveness of the client.""" if test_suite_name == "eels/consume-rlp": return 8545 - elif test_suite_name in {"eels/consume-engine", "eels/consume-sync"}: + elif test_suite_name in { + "eels/consume-engine", + "eels/consume-enginex", + "eels/consume-sync", + }: return 8551 raise ValueError( f"Unexpected test suite name '{test_suite_name}' while setting HIVE_CHECK_LIVE_PORT." diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/engine/conftest.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/engine/conftest.py index 518ed1799b..308c0cfe85 100644 --- a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/engine/conftest.py +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/engine/conftest.py @@ -8,11 +8,9 @@ from typing import Mapping import pytest -from hive.client import Client -from execution_testing.exceptions import ExceptionMapper from execution_testing.fixtures import BlockchainEngineFixture -from execution_testing.rpc import EngineRPC +from execution_testing.fixtures.blockchain import FixtureHeader pytest_plugins = ( "execution_testing.cli.pytest_commands.plugins.pytest_hive.pytest_hive", @@ -21,6 +19,7 @@ "execution_testing.cli.pytest_commands.plugins.consume.simulators.test_case_description", "execution_testing.cli.pytest_commands.plugins.consume.simulators.timing_data", "execution_testing.cli.pytest_commands.plugins.consume.simulators.exceptions", + "execution_testing.cli.pytest_commands.plugins.consume.simulators.engine_api", ) @@ -29,21 +28,6 @@ def pytest_configure(config: pytest.Config) -> None: config.supported_fixture_formats = [BlockchainEngineFixture] # type: ignore[attr-defined] -@pytest.fixture(scope="function") -def engine_rpc( - client: Client, client_exception_mapper: ExceptionMapper | None -) -> EngineRPC: - """Initialize engine RPC client for the execution client under test.""" - if client_exception_mapper: - return EngineRPC( - f"http://{client.ip}:8551", - response_validation_context={ - "exception_mapper": client_exception_mapper, - }, - ) - return EngineRPC(f"http://{client.ip}:8551") - - @pytest.fixture(scope="module") def test_suite_name() -> str: """The name of the hive test suite used in this simulator.""" @@ -64,3 +48,9 @@ def client_files( files = {} files["/genesis.json"] = buffered_genesis return files + + +@pytest.fixture(scope="function") +def genesis_header(fixture: BlockchainEngineFixture) -> "FixtureHeader": + """Provide the genesis header from the fixture (engine mode compatibility).""" + return fixture.genesis diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/engine_api.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/engine_api.py new file mode 100644 index 0000000000..8e09463291 --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/engine_api.py @@ -0,0 +1,36 @@ +"""Pytest fixtures for Engine API RPC clients.""" + +import pytest +from hive.client import Client + +from execution_testing.exceptions import ExceptionMapper +from execution_testing.rpc import EngineRPC + + +@pytest.fixture(scope="function") +def engine_rpc( + client: Client, client_exception_mapper: ExceptionMapper | None +) -> EngineRPC: + """ + Initialize Engine RPC client for the execution client under test. + + This fixture provides a configured EngineRPC instance that communicates with + the client's Engine API endpoint (port 8551). If an exception mapper is + available, it will be used for response validation to map client-specific + error messages to standard exception types. + + Args: + client: The Hive client instance to connect to. + client_exception_mapper: Optional exception mapper for response validation. + + Returns: + Configured EngineRPC instance for making Engine API calls. + """ + if client_exception_mapper: + return EngineRPC( + f"http://{client.ip}:8551", + response_validation_context={ + "exception_mapper": client_exception_mapper, + }, + ) + return EngineRPC(f"http://{client.ip}:8551") diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/enginex/__init__.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/enginex/__init__.py new file mode 100644 index 0000000000..83f66d1f3c --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/enginex/__init__.py @@ -0,0 +1 @@ +"""Engine X simulator for `blockchain_test_engine_x` fixtures.""" diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/enginex/conftest.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/enginex/conftest.py new file mode 100644 index 0000000000..507e5b2ce9 --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/enginex/conftest.py @@ -0,0 +1,210 @@ +""" +Pytest fixtures for the `consume enginex` simulator. + +Configures the hive back-end & EL clients for test execution with `BlockchainEngineXFixtures`. +Uses multi-test client architecture to reuse clients across tests with the same pre-alloc group. +""" + +import io +import logging +from typing import Generator, Mapping + +import pytest +from hive.client import Client, ClientType +from hive.testing import HiveTest + +from execution_testing.fixtures import BlockchainEngineXFixture +from execution_testing.fixtures.blockchain import FixtureHeader +from execution_testing.fixtures.pre_alloc_groups import PreAllocGroup + +from ..helpers.test_tracker import ( + PreAllocGroupTestTracker, + enginex_group_counts_key, + format_group_identifier, +) +from ..multi_test_client import MultiTestClientManager +from ..timing_data import TimingData + +logger = logging.getLogger(__name__) + +pytest_plugins = ( + "execution_testing.cli.pytest_commands.plugins.pytest_hive.pytest_hive", + "execution_testing.cli.pytest_commands.plugins.consume.simulators.base", + "execution_testing.cli.pytest_commands.plugins.consume.simulators.multi_test_client", + "execution_testing.cli.pytest_commands.plugins.consume.simulators.test_case_description", + "execution_testing.cli.pytest_commands.plugins.consume.simulators.timing_data", + "execution_testing.cli.pytest_commands.plugins.consume.simulators.exceptions", + "execution_testing.cli.pytest_commands.plugins.consume.simulators.helpers.test_tracker", + "execution_testing.cli.pytest_commands.plugins.consume.simulators.engine_api", +) + + +def pytest_configure(config: pytest.Config) -> None: + """Set the supported fixture formats for the enginex simulator.""" + config.supported_fixture_formats = [BlockchainEngineXFixture] # type: ignore[attr-defined] + + +@pytest.hookimpl(trylast=True) +def pytest_collection_modifyitems( + session: pytest.Session, config: pytest.Config, items: list[pytest.Item] +) -> None: + """ + Count tests per pre-allocation group during collection phase. + + This hook analyzes all collected test items to determine how many tests + belong to each pre-alloc group, enabling automatic client cleanup when + all tests in a group complete. + + Uses `trylast=True` to run after test deselection (from `-k`, `-m` filters). + Reads group identifiers from `xdist_group` markers added in + `pytest_generate_tests`. + """ + supported_formats = getattr(config, "supported_fixture_formats", []) + if BlockchainEngineXFixture not in supported_formats: + return + + group_counts: dict[str, int] = {} + + for item in items: + # Extract group identifier from xdist_group marker + # (marker was added in pytest_generate_tests in consume.py) + group_identifier = None + for marker in item.iter_markers("xdist_group"): + if hasattr(marker, "kwargs") and "name" in marker.kwargs: + group_identifier = marker.kwargs["name"] + break + + if group_identifier: + group_counts[group_identifier] = ( + group_counts.get(group_identifier, 0) + 1 + ) + + if group_counts: + # Store counts in session stash for the test tracker fixture to use + session.stash[enginex_group_counts_key] = group_counts + logger.info( + f"Counted {len(group_counts)} pre-alloc groups with " + f"{sum(group_counts.values())} total tests" + ) + + # Sort tests by group_identifier to ensure consecutive execution + # This minimizes client thrashing and enables immediate client cleanup + def get_group_key(item: pytest.Item) -> str: + """Extract group identifier from item for sorting.""" + for marker in item.iter_markers("xdist_group"): + if hasattr(marker, "kwargs") and "name" in marker.kwargs: + return marker.kwargs["name"] + raise AssertionError( + f"EngineX test '{item.nodeid}' missing xdist_group marker" + ) + + items.sort(key=get_group_key) + logger.info( + "Sorted tests by pre-alloc group for consecutive execution" + ) + else: + logger.warning("No enginex test groups found during collection") + + +@pytest.fixture(scope="session", autouse=True) +def _configure_client_manager( + multi_test_client_manager: MultiTestClientManager, + pre_alloc_group_test_tracker: PreAllocGroupTestTracker, +) -> None: + """Wire the test tracker to the client manager at session start.""" + multi_test_client_manager.set_test_tracker(pre_alloc_group_test_tracker) + + +@pytest.fixture(scope="module") +def test_suite_name() -> str: + """The name of the hive test suite used in this simulator.""" + return "eels/consume-enginex" + + +@pytest.fixture(scope="module") +def test_suite_description() -> str: + """The description of the hive test suite used in this simulator.""" + return ( + "Execute blockchain tests against clients using the Engine API with " + "pre-allocation group optimization using Engine X fixtures." + ) + + +@pytest.fixture(scope="function") +def check_live_port(test_suite_name: str) -> int: + """Port used by hive to check for liveness of the client.""" + return 8551 # Engine API port + + +@pytest.fixture(scope="function") +def client( + shared_hive_test: HiveTest, + multi_test_client_manager: MultiTestClientManager, + fixture: BlockchainEngineXFixture, + client_type: ClientType, + environment: dict, + client_files: Mapping[str, io.BufferedReader], + total_timing_data: TimingData, + request: pytest.FixtureRequest, +) -> Generator[Client, None, None]: + """ + Get or create a shared client for this test's pre-allocation group. + + This function-scoped fixture is called for each test, but it reuses clients + across tests that share the same pre-allocation group. + """ + group_identifier = fixture.pre_hash + test_id = request.node.nodeid + + # Check for existing client + existing_client = multi_test_client_manager.get_client(group_identifier) + if existing_client is not None: + logger.info( + f"♻️ Reusing client for group " + f"{format_group_identifier(group_identifier)}" + ) + try: + yield existing_client + finally: + multi_test_client_manager.mark_test_completed( + group_identifier, test_id + ) + return + + # Start new client + logger.info( + f"🚀 Starting client ({client_type.name}) for group " + f"{format_group_identifier(group_identifier)}" + ) + + with total_timing_data.time("Start client"): + client = shared_hive_test.start_client( + client_type=client_type, + environment=environment, + files=client_files, + ) + + assert client is not None, ( + f"Unable to connect to client ({client_type.name}) via Hive. " + "Check the client or Hive server logs for more information." + ) + + logger.info( + f"Client ({client_type.name}) ready for group " + f"{format_group_identifier(group_identifier)}" + ) + + multi_test_client_manager.register_client(group_identifier, client) + + try: + yield client + finally: + multi_test_client_manager.mark_test_completed( + group_identifier, test_id + ) + + +@pytest.fixture(scope="function") +def genesis_header(pre_alloc_group: PreAllocGroup) -> FixtureHeader: + """Provide the genesis header from the pre-allocation group.""" + return pre_alloc_group.genesis diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/helpers/test_tracker.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/helpers/test_tracker.py new file mode 100644 index 0000000000..8f30a10216 --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/helpers/test_tracker.py @@ -0,0 +1,123 @@ +"""Test completion tracking for multi-test client architectures.""" + +import logging +from typing import Dict, Set + +import pytest +from pytest import StashKey + +logger = logging.getLogger(__name__) + +# Typed stash keys for session-scoped data (replaces dynamic attributes) +enginex_group_counts_key: StashKey[Dict[str, int]] = StashKey() + + +def format_group_identifier(group_identifier: str, max_len: int = 16) -> str: + """ + Safely format group identifier for logging. + + Args: + group_identifier: Group identifier string (e.g., pre_hash) + max_len: Maximum length for formatted output + + Returns: + Formatted string, truncated if necessary + """ + if len(group_identifier) <= max_len: + return group_identifier + return group_identifier[:max_len] + + +class PreAllocGroupTestTracker: + """ + Tracks test completion per pre-allocation group to enable automatic client cleanup. + + This tracker maintains counts of expected vs. completed tests for each group. + When all tests in a group complete, it signals that the associated client can be stopped. + """ + + def __init__(self) -> None: + """Initialize the test tracker.""" + self.expected_counts: Dict[ + str, int + ] = {} # group_identifier -> total expected tests + self.completed_tests: Dict[ + str, Set[str] + ] = {} # group_identifier -> set of completed test IDs + logger.info("PreAllocGroupTestTracker initialized") + + def set_group_test_count(self, group_identifier: str, count: int) -> None: + """ + Set the expected number of tests for a group. + + This is typically called during pytest collection phase. + + Args: + group_identifier: The group identifier + count: Expected number of tests in this group + """ + self.expected_counts[group_identifier] = count + self.completed_tests[group_identifier] = set() + logger.debug( + f"Set expected test count for group {format_group_identifier(group_identifier)}: {count}" + ) + + def mark_test_completed(self, group_identifier: str, test_id: str) -> bool: + """ + Mark a test as completed and check if the group is now complete. + + Args: + group_identifier: The group identifier + test_id: Unique identifier for the test + + Returns: + True if all tests in the group are now complete, False otherwise + """ + if group_identifier not in self.completed_tests: + logger.warning( + f"Marking test complete for unknown group {format_group_identifier(group_identifier)}, initializing" + ) + self.completed_tests[group_identifier] = set() + + self.completed_tests[group_identifier].add(test_id) + completed = len(self.completed_tests[group_identifier]) + expected = self.expected_counts.get(group_identifier, 0) + + logger.debug( + f"Group {format_group_identifier(group_identifier)}: {completed}/{expected} tests completed" + ) + + # Check if group is complete + is_complete = completed >= expected and expected > 0 + if is_complete: + logger.info( + f"✓ Pre-alloc group {format_group_identifier(group_identifier)} complete " + f"({completed}/{expected} tests)" + ) + + return is_complete + + +@pytest.fixture(scope="session") +def pre_alloc_group_test_tracker( + request: pytest.FixtureRequest, +) -> PreAllocGroupTestTracker: + """ + Provide session-scoped test tracker for automatic client cleanup. + + This fixture initializes the tracker and populates it with test counts + from the collection phase (if available via pytest stash). + """ + tracker = PreAllocGroupTestTracker() + + # Load test counts from session stash (set during collection) + session = request.session + group_counts = session.stash.get(enginex_group_counts_key, None) + if group_counts is not None: + for group_identifier, count in group_counts.items(): + tracker.set_group_test_count(group_identifier, count) + logger.info( + f"Loaded {len(group_counts)} group counts from session stash" + ) + + return tracker diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/multi_test_client.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/multi_test_client.py new file mode 100644 index 0000000000..f6e57113d0 --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/multi_test_client.py @@ -0,0 +1,328 @@ +"""Common pytest fixtures for simulators with multi-test client architecture.""" + +import io +import json +import logging +from typing import Dict, Generator, Optional, cast + +import pytest +from hive.client import Client + +from execution_testing.base_types import to_json +from execution_testing.fixtures import BlockchainEngineXFixture +from execution_testing.fixtures.pre_alloc_groups import PreAllocGroup + +from ..consume import FixturesSource +from .helpers.ruleset import ruleset +from .helpers.test_tracker import ( + PreAllocGroupTestTracker, + format_group_identifier, +) + +logger = logging.getLogger(__name__) + + +class MultiTestClientManager: + """ + Session-scoped manager for client lifecycle across multiple tests. + + This manager coordinates client reuse across tests sharing the same pre-allocation + group, enabling efficient test execution by avoiding redundant client restarts. + """ + + def __init__(self) -> None: + """Initialize the multi-test client manager.""" + self.clients: Dict[str, Client] = {} # group_identifier -> Client + self.test_tracker: Optional[PreAllocGroupTestTracker] = None + logger.info("MultiTestClientManager initialized") + + def set_test_tracker(self, tracker: PreAllocGroupTestTracker) -> None: + """ + Set the test tracker for automatic client cleanup. + + Args: + tracker: The PreAllocGroupTestTracker instance + """ + self.test_tracker = tracker + logger.debug("Test tracker registered with MultiTestClientManager") + + def get_client(self, group_identifier: str) -> Optional[Client]: + """ + Get the client instance for a group. + + Args: + group_identifier: The group identifier + + Returns: + The client instance if available, None otherwise + """ + if group_identifier in self.clients: + logger.debug( + f"Found existing client for group " + f"{format_group_identifier(group_identifier)}" + ) + return self.clients[group_identifier] + + logger.debug( + f"No existing client for group " + f"{format_group_identifier(group_identifier)}" + ) + return None + + def register_client(self, group_identifier: str, client: Client) -> None: + """ + Register a newly started client for a group. + + Args: + group_identifier: The group identifier + client: The started client instance + """ + if group_identifier in self.clients: + raise RuntimeError( + f"Client already exists for group " + f"{format_group_identifier(group_identifier)}" + ) + + self.clients[group_identifier] = client + logger.info( + f"Registered client for group " + f"{format_group_identifier(group_identifier)}" + ) + + def mark_test_completed(self, group_identifier: str, test_id: str) -> None: + """ + Mark a test as completed and trigger automatic client cleanup if appropriate. + + Args: + group_identifier: The group identifier + test_id: Unique identifier for the test + """ + if self.test_tracker is None: + logger.warning( + "Test tracker not set, cannot perform automatic cleanup" + ) + return + + is_group_complete = self.test_tracker.mark_test_completed( + group_identifier, test_id + ) + + # Stop the client immediately when all tests in the group are complete + if is_group_complete: + logger.info( + f"✓ Group {format_group_identifier(group_identifier)} complete" + ) + if group_identifier in self.clients: + client = self.clients[group_identifier] + try: + logger.info( + f"🛑 Stopping client for group " + f"{format_group_identifier(group_identifier)}" + ) + client.stop() + except Exception as e: + logger.error( + f"Error stopping client for group " + f"{format_group_identifier(group_identifier)}: {e}" + ) + finally: + # Always remove from tracking, even if stop failed + del self.clients[group_identifier] + + def stop_all_clients(self) -> None: + """Stop all remaining clients (called at session end).""" + if not self.clients: + logger.info("No clients to clean up") + return + + logger.info(f"Stopping {len(self.clients)} remaining client(s)...") + for group_identifier, client in list(self.clients.items()): + try: + logger.info( + f"Stopping client for group " + f"{format_group_identifier(group_identifier)}" + ) + client.stop() + except Exception as e: + logger.error( + f"Error stopping client for group " + f"{format_group_identifier(group_identifier)}: {e}" + ) + + self.clients.clear() + logger.info("All clients stopped") + + +@pytest.fixture(scope="session") +def multi_test_client_manager() -> Generator[ + MultiTestClientManager, None, None +]: + """ + Provide session-scoped MultiTestClientManager with automatic cleanup. + + Yields: + The MultiTestClientManager instance + """ + manager = MultiTestClientManager() + try: + yield manager + finally: + logger.info("Session ending, cleaning up multi-test clients...") + manager.stop_all_clients() + + +@pytest.fixture(scope="session") +def pre_alloc_group_cache() -> Dict[str, PreAllocGroup]: + """Cache for pre-allocation groups to avoid reloading from disk.""" + return {} + + +@pytest.fixture(scope="session") +def client_genesis_cache() -> Dict[str, dict]: + """Cache for client genesis configs to avoid redundant to_json calls.""" + return {} + + +@pytest.fixture(scope="session") +def environment_cache() -> Dict[str, dict]: + """Cache for environment configs to avoid redundant computation.""" + return {} + + +@pytest.fixture(scope="function") +def pre_alloc_group( + fixture: BlockchainEngineXFixture, + fixtures_source: FixturesSource, + pre_alloc_group_cache: Dict[str, PreAllocGroup], +) -> PreAllocGroup: + """ + Load the pre-allocation group for the current test case. + + Args: + fixture: The BlockchainEngineXFixture for this test + fixtures_source: The source of fixture files + pre_alloc_group_cache: Session-scoped cache to avoid reloading + + Returns: + The PreAllocGroup for this test's pre_hash + """ + pre_hash = fixture.pre_hash + + # Check cache first + if pre_hash in pre_alloc_group_cache: + logger.debug( + f"Using cached pre-alloc group for " + f"{format_group_identifier(pre_hash)}" + ) + return pre_alloc_group_cache[pre_hash] + + # Load from disk + if fixtures_source.is_stdin: + raise ValueError( + "Pre-allocation groups require file-based fixture input" + ) + + # Look for pre-allocation group file + pre_alloc_path = ( + fixtures_source.path + / "blockchain_tests_engine_x" + / "pre_alloc" + / f"{pre_hash}.json" + ) + + if not pre_alloc_path.exists(): + raise FileNotFoundError( + f"Pre-allocation group file not found: {pre_alloc_path}" + ) + + # Load and cache + logger.debug(f"Loading pre-alloc group from {pre_alloc_path}") + pre_alloc_group_obj = PreAllocGroup.model_validate_json( + pre_alloc_path.read_text() + ) + + pre_alloc_group_cache[pre_hash] = pre_alloc_group_obj + logger.info( + f"Loaded pre-alloc group for {format_group_identifier(pre_hash)}" + ) + + return pre_alloc_group_obj + + +@pytest.fixture(scope="function") +def client_genesis( + pre_alloc_group: PreAllocGroup, + fixture: BlockchainEngineXFixture, + client_genesis_cache: Dict[str, dict], +) -> dict: + """ + Convert pre-alloc group genesis header and pre-state to client genesis. + + Parallel to single_test_client.client_genesis but uses PreAllocGroup. + Uses caching to avoid redundant to_json calls for tests sharing the same pre_hash. + """ + pre_hash = fixture.pre_hash + + if pre_hash in client_genesis_cache: + return client_genesis_cache[pre_hash] + + genesis = to_json(pre_alloc_group.genesis) + alloc = to_json(pre_alloc_group.pre) + # NOTE: nethermind requires account keys without '0x' prefix + genesis["alloc"] = {k.replace("0x", ""): v for k, v in alloc.items()} + + client_genesis_cache[pre_hash] = genesis + return genesis + + +@pytest.fixture(scope="function") +def environment( + pre_alloc_group: PreAllocGroup, + fixture: BlockchainEngineXFixture, + check_live_port: int, + environment_cache: Dict[str, dict], +) -> dict: + """ + Define environment variables for multi-test client startup. + + Parallel to single_test_client.environment but uses PreAllocGroup. + Uses caching to avoid redundant computation for tests sharing the same pre_hash. + """ + pre_hash = fixture.pre_hash + + if pre_hash in environment_cache: + return environment_cache[pre_hash] + + fork = pre_alloc_group.fork + assert fork in ruleset, f"fork '{fork}' missing in hive ruleset" + env = { + "HIVE_CHAIN_ID": "1", + "HIVE_NETWORK_ID": "1", + "HIVE_FORK_DAO_VOTE": "1", + "HIVE_NODETYPE": "full", + "HIVE_CHECK_LIVE_PORT": str(check_live_port), + **{k: f"{v:d}" for k, v in ruleset[fork].items()}, + "HIVE_FORK": pre_alloc_group.fork.name(), + "HIVE_GROUP_ID": format_group_identifier(fixture.pre_hash), + } + + environment_cache[pre_hash] = env + return env + + +@pytest.fixture(scope="function") +def buffered_genesis(client_genesis: dict) -> io.BufferedReader: + """ + Create buffered reader for genesis.json. + + Identical to single_test_client.buffered_genesis. + """ + genesis_json = json.dumps(client_genesis) + genesis_bytes = genesis_json.encode("utf-8") + return io.BufferedReader(cast(io.RawIOBase, io.BytesIO(genesis_bytes))) + + +@pytest.fixture(scope="function") +def client_files(buffered_genesis: io.BufferedReader) -> dict: + """Define files for Hive client startup.""" + return {"/genesis.json": buffered_genesis} diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/simulator_logic/test_via_engine.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/simulator_logic/test_via_engine.py index f6a1973b12..ad2a2154ea 100644 --- a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/simulator_logic/test_via_engine.py +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/simulator_logic/test_via_engine.py @@ -1,15 +1,25 @@ """ A hive based simulator that executes blocks against clients using the -`engine_newPayloadVX` method from the Engine API. The simulator uses the -`BlockchainEngineFixtures` to test against clients. +`engine_newPayloadVX` method from the Engine API. + +The unified test function in this module supports both: +- `BlockchainEngineFixtures`, the original engine mode with a 1-to-1 relationship between client instance and test, i.e., each test is executed against a fresh client instance. +- `BlockchainEngineXFixtures`, enginex mode with client reuse across tests with a shared pre-alloc groups. Each `engine_newPayloadVX` is verified against the appropriate VALID/INVALID responses. """ +from typing import Union + from execution_testing.exceptions import UndefinedException -from execution_testing.fixtures import BlockchainEngineFixture +from execution_testing.fixtures import ( + BlockchainEngineFixture, + BlockchainEngineXFixture, +) from execution_testing.logging import get_logger + +from execution_testing.fixtures.blockchain import FixtureHeader from execution_testing.rpc import ( EngineRPC, EthRPC, @@ -34,53 +44,73 @@ def test_blockchain_via_engine( timing_data: TimingData, eth_rpc: EthRPC, engine_rpc: EngineRPC, - fixture: BlockchainEngineFixture, + fixture: Union[BlockchainEngineFixture, BlockchainEngineXFixture], strict_exception_matching: bool, + genesis_header: FixtureHeader, ) -> None: """ - 1. Check the client genesis block hash matches - `fixture.genesis.block_hash`. - 2. Execute the test case fixture blocks against the client under test using - the `engine_newPayloadVX` method from the Engine API. - 3. For valid payloads a forkchoice update is performed to finalize the - chain. + Execute blockchain test fixtures against a client using the Engine API. + + This function supports two modes: + + 1. **Engine Mode** (`BlockchainEngineFixture`): + - Uses per-test clients (started fresh for each test). + - Always performs initial FCU to genesis. + - Always performs FCU after valid payloads. + - genesis_header comes from fixture.genesis (via fixture). + - needs_genesis_init is always True (via fixture). + + 2. **EngineX Mode** (`BlockchainEngineXFixture`): + - Reuses clients across tests with same pre-alloc group. + - Skips initial FCU for reused clients. + - Skips FCU after valid payloads to keep client at genesis. + - genesis_header comes from separate pre_alloc_group fixture. + - needs_genesis_init is False for reused clients. + + Steps: + 1. Check the client genesis block hash matches genesis_header.block_hash + 2. Execute test fixture blocks using engine_newPayloadVX + 3. For valid payloads, perform forkchoice update to finalize chain + (unless client is being reused, in which case skip FCU) """ - # Send initial forkchoice update - with timing_data.time("Initial forkchoice update"): - logger.info("Sending initial forkchoice update to genesis block...") - try: - response = engine_rpc.forkchoice_updated_with_retry( - forkchoice_state=ForkchoiceState( - head_block_hash=fixture.genesis.block_hash, - ), - forkchoice_version=fixture.payloads[ - 0 - ].forkchoice_updated_version, - max_attempts=30, - wait_fixed=1.0, + if isinstance(fixture, BlockchainEngineFixture): + with timing_data.time("Initial forkchoice update"): + logger.info( + "Sending initial forkchoice update to genesis block..." ) - if response.payload_status.status != PayloadStatusEnum.VALID: - raise LoggedError( - f"Unexpected status on forkchoice updated to genesis: " - f"{response.payload_status.status}" + try: + response = engine_rpc.forkchoice_updated_with_retry( + forkchoice_state=ForkchoiceState( + head_block_hash=fixture.genesis.block_hash, + ), + forkchoice_version=fixture.payloads[ + 0 + ].forkchoice_updated_version, + max_attempts=30, + wait_fixed=1.0, ) - except ForkchoiceUpdateTimeoutError as e: - raise LoggedError( - f"Timed out waiting for forkchoice update to genesis: {e}" - ) from None + if response.payload_status.status != PayloadStatusEnum.VALID: + raise LoggedError( + f"Unexpected status on forkchoice updated to genesis: " + f"{response.payload_status.status}" + ) + except ForkchoiceUpdateTimeoutError as e: + raise LoggedError( + f"Timed out waiting for forkchoice update to genesis: {e}" + ) from None with timing_data.time("Get genesis block"): logger.info("Calling getBlockByNumber to get genesis block...") genesis_block = eth_rpc.get_block_by_number(0) assert genesis_block is not None, "genesis_block is None" - if genesis_block["hash"] != str(fixture.genesis.block_hash): - expected = fixture.genesis.block_hash + if genesis_block["hash"] != str(genesis_header.block_hash): + expected = genesis_header.block_hash got = genesis_block["hash"] logger.fail( f"Genesis block hash mismatch. Expected: {expected}, Got: {got}" ) raise GenesisBlockMismatchExceptionError( - expected_header=fixture.genesis, + expected_header=genesis_header, got_genesis_block=genesis_block, ) @@ -174,7 +204,9 @@ def test_blockchain_via_engine( f"Unexpected error code: {e.code}, expected: {payload.error_code}" ) from e - if payload.valid(): + if payload.valid() and isinstance( + fixture, BlockchainEngineFixture + ): with payload_timing.time( f"engine_forkchoiceUpdatedV{payload.forkchoice_updated_version}" ): diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/sync/conftest.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/sync/conftest.py index d460280625..eea8525024 100644 --- a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/sync/conftest.py +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/consume/simulators/sync/conftest.py @@ -24,6 +24,7 @@ "execution_testing.cli.pytest_commands.plugins.consume.simulators.test_case_description", "execution_testing.cli.pytest_commands.plugins.consume.simulators.timing_data", "execution_testing.cli.pytest_commands.plugins.consume.simulators.exceptions", + "execution_testing.cli.pytest_commands.plugins.consume.simulators.engine_api", ) @@ -104,21 +105,6 @@ def pytest_collection_modifyitems( item._nodeid = base + new_suffix -@pytest.fixture(scope="function") -def engine_rpc( - client: Client, client_exception_mapper: ExceptionMapper | None -) -> EngineRPC: - """Initialize engine RPC client for the execution client under test.""" - if client_exception_mapper: - return EngineRPC( - f"http://{client.ip}:8551", - response_validation_context={ - "exception_mapper": client_exception_mapper, - }, - ) - return EngineRPC(f"http://{client.ip}:8551") - - @pytest.fixture(scope="function") def eth_rpc(client: Client) -> EthRPC: """Initialize eth RPC client for the execution client under test.""" diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/pytest_hive/pytest_hive.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/pytest_hive/pytest_hive.py index 8064e2c48e..9c46ccaaf4 100644 --- a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/pytest_hive/pytest_hive.py +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/pytest_hive/pytest_hive.py @@ -245,6 +245,62 @@ def test_suite( users_file.unlink() +@pytest.fixture(scope="module") +def shared_hive_test( + test_suite: HiveTestSuite, + test_suite_name: str, +) -> Generator[HiveTest, None, None]: + """ + Create a module-scoped Hive test for running multiple pytest tests against a single client. + + This fixture provides a reusable Hive test context that persists across multiple + pytest tests within a module, enabling client reuse optimization. Instead of + restarting clients for each pytest test (expensive), simulators can start clients + under this shared test context and reuse them across multiple tests (fast). + + The shared test lives for the entire module and prevents Hive from terminating + clients between individual pytest tests. This is essential for simulators that + batch multiple pytest tests against the same client instance. + + Usage: + Simulators can start clients using this shared test context via: + - Direct fixture dependency: `shared_hive_test: HiveTest` parameter + - Implicit dependency: `@pytest.mark.usefixtures("shared_hive_test")` + + Returns: + `HiveTest` instance that persists for the module scope. + + Example: + ```python + @pytest.fixture(scope="function") + def client(shared_hive_test: HiveTest, ...) -> Client: + # Start client on shared test (won't be killed between tests) + client = shared_hive_test.start_client(...) + yield client + # Client lifecycle managed by simulator + ``` + """ + logger.info( + f"Creating shared Hive test for '{test_suite_name}' (module scope, client reuse)" + ) + test: HiveTest = test_suite.start_test( + name=f"{test_suite_name}-shared-clients", + description=f"Shared test context for {test_suite_name} client management", + ) + logger.info(f"Shared Hive test created: {test.id}") + yield test + + # End the shared test at module end + # Note: Simulators should manage client lifecycle themselves + # (e.g., stop clients when done, not rely on this teardown) + logger.info(f"Ending shared Hive test for '{test_suite_name}'") + test.end( + result=HiveTestResult( + test_pass=True, details="Shared test context completed" + ) + ) + + @pytest.fixture(scope="function") def hive_test( request: pytest.FixtureRequest, test_suite: HiveTestSuite diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/processors.py b/packages/testing/src/execution_testing/cli/pytest_commands/processors.py index dd10fac25a..3d6e5179af 100644 --- a/packages/testing/src/execution_testing/cli/pytest_commands/processors.py +++ b/packages/testing/src/execution_testing/cli/pytest_commands/processors.py @@ -105,6 +105,14 @@ def process_args(self, args: List[str]) -> List[str]: ] and not self._has_parallelism_flag(args): modified_args.extend(["-n", str(hive_parallelism)]) + # For enginex: ensure xdist uses loadgroup distribution so tests with + # the same xdist_group marker (pre-alloc group) run on the same worker + if self.command_name == "enginex" and self._has_parallelism_flag( + modified_args + ): + if "--dist" not in modified_args: + modified_args.extend(["--dist", "loadgroup"]) + if os.getenv("HIVE_RANDOM_SEED") is not None: warnings.warn( "HIVE_RANDOM_SEED is not yet supported.", stacklevel=2 @@ -120,6 +128,13 @@ def process_args(self, args: List[str]) -> List[str]: "execution_testing.cli.pytest_commands.plugins.consume.simulators.engine.conftest", ] ) + elif self.command_name == "enginex": + modified_args.extend( + [ + "-p", + "execution_testing.cli.pytest_commands.plugins.consume.simulators.enginex.conftest", + ] + ) elif self.command_name == "sync": modified_args.extend( [ diff --git a/vulture_whitelist.py b/vulture_whitelist.py index 2d7509f2c5..a336ba6ca1 100644 --- a/vulture_whitelist.py +++ b/vulture_whitelist.py @@ -141,3 +141,8 @@ CommentReplaceCommand.transform_module_impl _children # unused attribute (src/ethereum_spec_tools/docc.py:751) + +# enginex/conftest.py - pytest fixtures (not direct calls) +_configure_client_manager # autouse fixture +test_suite_name # hive test suite name fixture +genesis_header # genesis header fixture diff --git a/whitelist.txt b/whitelist.txt index 4f73730fd5..343f6ea780 100644 --- a/whitelist.txt +++ b/whitelist.txt @@ -477,6 +477,7 @@ encodings endian endianness EngineAPI +enginex enum env envvar