diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/filler/filler.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/filler/filler.py index bd2490f820..7dfef105de 100644 --- a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/filler/filler.py +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/filler/filler.py @@ -6,10 +6,12 @@ and writes the generated fixtures to file. """ +import atexit import configparser import datetime import json import os +import signal import warnings from dataclasses import dataclass, field from pathlib import Path @@ -35,6 +37,8 @@ from execution_testing.client_clis.clis.geth import FixtureConsumerTool from execution_testing.fixtures import ( BaseFixture, + BlockchainEngineFixture, + BlockchainFixture, FixtureCollector, FixtureConsumer, FixtureFillingPhase, @@ -43,6 +47,7 @@ PreAllocGroupBuilder, PreAllocGroupBuilders, PreAllocGroups, + StateFixture, TestInfo, merge_partial_fixture_files, ) @@ -70,6 +75,47 @@ ) from .fixture_output import FixtureOutput +# Fixture output dir for keyboard interrupt cleanup (set in pytest_configure). +# Used by _merge_on_exit to merge partial JSONL files on Ctrl+C or SIGTERM. +_fixture_output_dir: Path | None = None +_atexit_registered: bool = False +_interrupt_count: int = 0 +_original_sigint_handler: Any = None +_original_sigterm_handler: Any = None + + +def _termination_handler(signum: int, frame: Any) -> None: + """Handle SIGINT/SIGTERM gracefully during test filling.""" + del frame + global _interrupt_count + global _original_sigint_handler, _original_sigterm_handler + _interrupt_count += 1 + + if _interrupt_count == 1: + # First interrupt: restore original handlers and re-raise + if _original_sigint_handler is not None: + signal.signal(signal.SIGINT, _original_sigint_handler) + if _original_sigterm_handler is not None: + signal.signal(signal.SIGTERM, _original_sigterm_handler) + if signum == signal.SIGTERM: + raise SystemExit(128 + signum) + raise KeyboardInterrupt + # Subsequent interrupts: ignore and print message + print("\nMerging fixtures, please wait...", flush=True) + + +def _merge_on_exit() -> None: + """Atexit handler to merge partial JSONL files. Ignores signals.""" + global _fixture_output_dir + if _fixture_output_dir is not None: + signal.signal(signal.SIGINT, signal.SIG_IGN) + signal.signal(signal.SIGTERM, signal.SIG_IGN) + merge_partial_fixture_files(_fixture_output_dir) + # Also merge index if partial indexes exist + meta_dir = _fixture_output_dir / ".meta" + if meta_dir.exists() and any(meta_dir.glob("partial_index*.jsonl")): + merge_partial_indexes(_fixture_output_dir, quiet_mode=True) + @dataclass(kw_only=True) class PhaseManager: @@ -706,6 +752,22 @@ def pytest_configure(config: pytest.Config) -> None: except ValueError as e: pytest.exit(str(e), returncode=pytest.ExitCode.USAGE_ERROR) + # Register atexit/signal handlers for cleanup (master only, not workers). + global _fixture_output_dir, _atexit_registered + global _original_sigint_handler, _original_sigterm_handler + is_xdist_worker = hasattr(config, "workerinput") + if not config.fixture_output.is_stdout: # type: ignore[attr-defined] + _fixture_output_dir = config.fixture_output.directory # type: ignore[attr-defined] + if not _atexit_registered and not is_xdist_worker: + atexit.register(_merge_on_exit) + _original_sigint_handler = signal.signal( + signal.SIGINT, _termination_handler + ) + _original_sigterm_handler = signal.signal( + signal.SIGTERM, _termination_handler + ) + _atexit_registered = True + if ( not config.getoption("disable_html") and config.getoption("htmlpath") is None @@ -1047,7 +1109,11 @@ def evm_fixture_verification( verify_fixtures_bin = evm_bin reused_evm_bin = True if not verify_fixtures_bin: - return + pytest.exit( + "--verify-fixtures requires --evm-bin or --verify-fixtures-bin " + "to be specified.", + returncode=pytest.ExitCode.USAGE_ERROR, + ) try: evm_fixture_verification = FixtureConsumerTool.from_binary_path( binary_path=Path(verify_fixtures_bin), @@ -1241,13 +1307,16 @@ def fixture_collector( generate_index=request.config.getoption("generate_index"), ) yield fixture_collector - worker_id = os.environ.get("PYTEST_XDIST_WORKER", None) - fixture_collector.dump_fixtures(worker_id) - if do_fixture_verification: - fixture_collector.verify_fixture_files(evm_fixture_verification) - # Write partial index for this worker/scope - if fixture_collector.generate_index: - fixture_collector.write_partial_index(worker_id) + try: + # dump_fixtures() only needed for stdout mode + fixture_collector.dump_fixtures() + # Verify fixtures for stdout mode only (files are in memory). + # For file mode, verification happens at session finish after merge. + if do_fixture_verification and fixture_output.is_stdout: + fixture_collector.verify_fixture_files(evm_fixture_verification) + finally: + # Always close streaming file handles, even on error + fixture_collector.close_streaming_files() @pytest.fixture(autouse=True, scope="session") @@ -1609,6 +1678,65 @@ def pytest_collection_modifyitems( items[:] = slow_items + normal_items +def _verify_fixtures_post_merge( + config: pytest.Config, output_dir: Path +) -> None: + """ + Verify fixtures after merge if verification is enabled. + + Called from pytest_sessionfinish after partial files are merged into + final JSON fixtures. Runs evm statetest/blocktest on each fixture. + """ + if not config.getoption("verify_fixtures"): + return + + # Get the verification binary (same logic as evm_fixture_verification) + verify_fixtures_bin = config.getoption("verify_fixtures_bin") + if not verify_fixtures_bin: + verify_fixtures_bin = config.getoption("evm_bin") + if not verify_fixtures_bin: + return + + try: + evm_verification = FixtureConsumerTool.from_binary_path( + binary_path=Path(verify_fixtures_bin), + trace=getattr(config, "collect_traces", False), + ) + except Exception: + # Binary not recognized, skip verification (error already shown + # during fixture setup if --verify-fixtures was used) + return + + # Map directory names to fixture format classes + dir_to_format: dict[str, type[BaseFixture]] = { + StateFixture.output_base_dir_name(): StateFixture, + BlockchainFixture.output_base_dir_name(): BlockchainFixture, + BlockchainEngineFixture.output_base_dir_name(): ( + BlockchainEngineFixture + ), + } + + # Find all JSON fixture files and verify them + for json_file in output_dir.rglob("*.json"): + # Determine fixture format from top-level directory + relative_path = json_file.relative_to(output_dir) + if not relative_path.parts: + continue + + top_dir = relative_path.parts[0] + fixture_format = dir_to_format.get(top_dir) + if fixture_format is None: + continue + + if evm_verification.can_consume(fixture_format): + evm_verification.consume_fixture( + fixture_format, + json_file, + fixture_name=None, + debug_output_path=None, + ) + + def pytest_sessionfinish(session: pytest.Session, exitstatus: int) -> None: """ Perform session finish tasks. @@ -1656,6 +1784,9 @@ def pytest_sessionfinish(session: pytest.Session, exitstatus: int) -> None: for file in fixture_output.directory.rglob("*.lock"): file.unlink() + # Verify fixtures after merge if verification is enabled + _verify_fixtures_post_merge(session.config, fixture_output.directory) + # Generate index file for all produced fixtures by merging partial indexes. # Only merge if partial indexes were actually written (i.e., tests produced # fixtures). When no tests are filled (e.g., all skipped), no partial diff --git a/packages/testing/src/execution_testing/fixtures/collector.py b/packages/testing/src/execution_testing/fixtures/collector.py index 74bdb6b471..c7987663f5 100644 --- a/packages/testing/src/execution_testing/fixtures/collector.py +++ b/packages/testing/src/execution_testing/fixtures/collector.py @@ -10,6 +10,7 @@ from dataclasses import dataclass, field from pathlib import Path from typing import ( + IO, ClassVar, Dict, List, @@ -18,8 +19,6 @@ Tuple, ) -from filelock import FileLock - from execution_testing.base_types import to_json from .base import BaseFixture @@ -197,15 +196,23 @@ class FixtureCollector: single_fixture_per_file: bool filler_path: Path base_dump_dir: Optional[Path] = None - flush_interval: int = 1000 generate_index: bool = True + # Worker ID for partial files. None = read from env var. + worker_id: Optional[str] = None - # Internal state + # Internal state (only used for stdout mode) all_fixtures: Dict[Path, Fixtures] = field(default_factory=dict) - json_path_to_test_item: Dict[Path, TestInfo] = field(default_factory=dict) - # Store index entries as simple dicts - # (avoid Pydantic overhead during collection) - index_entries: List[Dict] = field(default_factory=list) + + # Streaming file handles - kept open for module duration + _partial_fixture_files: Dict[Path, IO[str]] = field(default_factory=dict) + _partial_index_file: Optional[IO[str]] = field(default=None) + _worker_id_cached: bool = field(default=False, init=False) + + # Lightweight tracking for verification (path, format class, debug_path) + # Only stores metadata, not fixture data - memory efficient + _fixtures_to_verify: List[Tuple[Path, type, Optional[Path]]] = field( + default_factory=list + ) def get_fixture_basename(self, info: TestInfo) -> Path: """Return basename of the fixture file for a given test case.""" @@ -227,8 +234,20 @@ def get_fixture_basename(self, info: TestInfo) -> Path: mode="module" ) + def _get_worker_id(self) -> str | None: + """Get the worker ID (from constructor or environment).""" + if self.worker_id is not None: + return self.worker_id + if not self._worker_id_cached: + # Cache the env var lookup + env_worker_id = os.environ.get("PYTEST_XDIST_WORKER") + if env_worker_id: + self.worker_id = env_worker_id + self._worker_id_cached = True + return self.worker_id + def add_fixture(self, info: TestInfo, fixture: BaseFixture) -> Path: - """Add fixture to the list of fixtures of a given test case.""" + """Add fixture and immediately stream to partial JSONL file.""" fixture_basename = self.get_fixture_basename(info) fixture_path = ( @@ -236,16 +255,25 @@ def add_fixture(self, info: TestInfo, fixture: BaseFixture) -> Path: / fixture.output_base_dir_name() / fixture_basename.with_suffix(fixture.output_file_extension) ) - # relevant when we group by test function - if fixture_path not in self.all_fixtures.keys(): - self.all_fixtures[fixture_path] = Fixtures(root={}) - self.json_path_to_test_item[fixture_path] = info - self.all_fixtures[fixture_path][info.get_id()] = fixture + # Stream fixture directly to partial JSONL (no memory accumulation) + if self.output_dir.name != "stdout": + self._stream_fixture_to_partial( + fixture_path, info.get_id(), fixture + ) + # Track for verification (lightweight - only path and format class) + debug_path = self._get_consume_direct_dump_dir(info) + self._fixtures_to_verify.append( + (fixture_path, fixture.__class__, debug_path) + ) + else: + # stdout mode: accumulate for final JSON dump + if fixture_path not in self.all_fixtures: + self.all_fixtures[fixture_path] = Fixtures(root={}) + self.all_fixtures[fixture_path][info.get_id()] = fixture - # Collect index entry while data is in memory (if indexing enabled) - # Store as simple dict to avoid Pydantic overhead during collection - if self.generate_index: + # Stream index entry directly to partial JSONL + if self.generate_index and self.output_dir.name != "stdout": relative_path = fixture_path.relative_to(self.output_dir) fixture_fork = fixture.get_fork() index_entry = { @@ -257,18 +285,67 @@ def add_fixture(self, info: TestInfo, fixture: BaseFixture) -> Path: } if (pre_hash := getattr(fixture, "pre_hash", None)) is not None: index_entry["pre_hash"] = pre_hash - self.index_entries.append(index_entry) - - if ( - self.flush_interval > 0 - and len(self.all_fixtures) >= self.flush_interval - ): - self.dump_fixtures() + self._stream_index_entry_to_partial(index_entry) return fixture_path - def dump_fixtures(self, worker_id: str | None = None) -> None: - """Dump all collected fixtures to their respective files.""" + def _get_partial_fixture_file(self, fixture_path: Path) -> "IO[str]": + """Get or create a file handle for streaming fixtures.""" + worker_id = self._get_worker_id() + suffix = f".{worker_id}" if worker_id else ".main" + partial_path = fixture_path.with_suffix(f".partial{suffix}.jsonl") + + if partial_path not in self._partial_fixture_files: + partial_path.parent.mkdir(parents=True, exist_ok=True) + self._partial_fixture_files[partial_path] = open(partial_path, "a") + + return self._partial_fixture_files[partial_path] + + def _stream_fixture_to_partial( + self, + fixture_path: Path, + fixture_id: str, + fixture: BaseFixture, + ) -> None: + """Stream a single fixture to its partial JSONL file.""" + value = json.dumps(fixture.json_dict_with_info(), indent=4) + line = json.dumps({"k": fixture_id, "v": value}) + "\n" + + f = self._get_partial_fixture_file(fixture_path) + f.write(line) + f.flush() # Ensure data is written immediately + + def _get_partial_index_file(self) -> "IO[str]": + """Get or create the file handle for streaming index entries.""" + if self._partial_index_file is None: + worker_id = self._get_worker_id() + suffix = f".{worker_id}" if worker_id else ".main" + partial_index_path = ( + self.output_dir / ".meta" / f"partial_index{suffix}.jsonl" + ) + partial_index_path.parent.mkdir(parents=True, exist_ok=True) + self._partial_index_file = open(partial_index_path, "a") + + return self._partial_index_file + + def _stream_index_entry_to_partial(self, entry: Dict) -> None: + """Stream a single index entry to partial JSONL file.""" + f = self._get_partial_index_file() + f.write(json.dumps(entry) + "\n") + f.flush() # Ensure data is written immediately + + def close_streaming_files(self) -> None: + """Close all open streaming file handles.""" + for f in self._partial_fixture_files.values(): + f.close() + self._partial_fixture_files.clear() + + if self._partial_index_file is not None: + self._partial_index_file.close() + self._partial_index_file = None + + def dump_fixtures(self) -> None: + """Dump collected fixtures (only used for stdout mode).""" if self.output_dir.name == "stdout": combined_fixtures = { k: to_json(v) @@ -276,65 +353,10 @@ def dump_fixtures(self, worker_id: str | None = None) -> None: for k, v in fixture.items() } json.dump(combined_fixtures, sys.stdout, indent=4) - return - os.makedirs(self.output_dir, exist_ok=True) - for fixture_path, fixtures in self.all_fixtures.items(): - os.makedirs(fixture_path.parent, exist_ok=True) - if len({fixture.__class__ for fixture in fixtures.values()}) != 1: - raise TypeError( - "All fixtures in a single file must have the same format." - ) - self._write_partial_fixtures(fixture_path, fixtures, worker_id) - - self.all_fixtures.clear() - - def _write_partial_fixtures( - self, file_path: Path, fixtures: Fixtures, worker_id: str | None - ) -> None: - """ - Write fixtures to a partial JSONL file (append-only). - - Each line is a JSON object: {"key": "fixture_id", "value": "json_str"} - This avoids O(n) merge work per worker - just O(1) append. - Final merge to JSON happens at session end. - """ - suffix = f".{worker_id}" if worker_id else ".main" - partial_path = file_path.with_suffix(f".partial{suffix}.jsonl") - partial_path.parent.mkdir(parents=True, exist_ok=True) - lock_file_path = partial_path.with_suffix(".lock") - - lines = [] - for name in fixtures: - value = json.dumps(fixtures[name].json_dict_with_info(), indent=4) - # Store as JSONL: {"k": key, "v": serialized value string} - lines.append(json.dumps({"k": name, "v": value}) + "\n") - - with FileLock(lock_file_path): - with open(partial_path, "a") as f: - f.writelines(lines) + self.all_fixtures.clear() + # For file output, fixtures are already streamed in add_fixture() - def verify_fixture_files( - self, evm_fixture_verification: FixtureConsumer - ) -> None: - """Run `evm [state|block]test` on each fixture.""" - for fixture_path, name_fixture_dict in self.all_fixtures.items(): - for _fixture_name, fixture in name_fixture_dict.items(): - if evm_fixture_verification.can_consume(fixture.__class__): - info = self.json_path_to_test_item[fixture_path] - consume_direct_dump_dir = ( - self._get_consume_direct_dump_dir(info) - ) - evm_fixture_verification.consume_fixture( - fixture.__class__, - fixture_path, - fixture_name=None, - debug_output_path=consume_direct_dump_dir, - ) - - def _get_consume_direct_dump_dir( - self, - info: TestInfo, - ) -> Path | None: + def _get_consume_direct_dump_dir(self, info: TestInfo) -> Path | None: """ Directory to dump the current test function's fixture.json and fixture verification debug output. @@ -350,37 +372,36 @@ def _get_consume_direct_dump_dir( self.base_dump_dir, self.filler_path, level="test_function" ) - def write_partial_index(self, worker_id: str | None = None) -> Path | None: + def verify_fixture_files( + self, evm_fixture_verification: FixtureConsumer + ) -> None: """ - Append collected index entries to a partial index file using JSONL - format. - - Uses append-only JSONL (JSON Lines) format for efficient writes without - read-modify-write cycles. Each line is a complete JSON object - representing one index entry. - - Args: - worker_id: The xdist worker ID (e.g., "gw0"), or None for master. - - Returns: - Path to the partial index file, or None if indexing is disabled. + Run `evm [state|block]test` on each fixture. + For streaming mode, uses lightweight tracking of fixture paths/formats + rather than keeping full fixtures in memory. """ - if not self.generate_index or not self.index_entries: - return None - - suffix = f".{worker_id}" if worker_id else ".master" - partial_index_path = ( - self.output_dir / ".meta" / f"partial_index{suffix}.jsonl" - ) - partial_index_path.parent.mkdir(parents=True, exist_ok=True) - lock_file_path = partial_index_path.with_suffix(".lock") - - # Append entries as JSONL (one JSON object per line) - # This avoids read-modify-write cycles - with FileLock(lock_file_path): - with open(partial_index_path, "a") as f: - for entry in self.index_entries: - f.write(json.dumps(entry) + "\n") - - return partial_index_path + if self.output_dir.name == "stdout": + # stdout mode: fixtures are in memory + for fixture_path, name_fixture_dict in self.all_fixtures.items(): + for _fixture_name, fixture in name_fixture_dict.items(): + if evm_fixture_verification.can_consume(fixture.__class__): + evm_fixture_verification.consume_fixture( + fixture.__class__, + fixture_path, + fixture_name=None, + debug_output_path=None, + ) + else: + # Streaming mode: use tracked fixture metadata + for entry in self._fixtures_to_verify: + fixture_path, fixture_format, debug_path = entry + if evm_fixture_verification.can_consume(fixture_format): + evm_fixture_verification.consume_fixture( + fixture_format, + fixture_path, + fixture_name=None, + debug_output_path=debug_path, + ) + # Clear tracking after verification + self._fixtures_to_verify.clear() diff --git a/packages/testing/src/execution_testing/fixtures/tests/test_collector.py b/packages/testing/src/execution_testing/fixtures/tests/test_collector.py index 87e55e6f89..4b41f1c6d4 100644 --- a/packages/testing/src/execution_testing/fixtures/tests/test_collector.py +++ b/packages/testing/src/execution_testing/fixtures/tests/test_collector.py @@ -79,7 +79,7 @@ def test_single_fixture_matches_json_dumps( fixture = _make_fixture(1) info = _make_info("tx_test", module_path) collector.add_fixture(info, fixture) - collector.dump_fixtures(worker_id="gw0") + collector.dump_fixtures() merge_partial_fixture_files(output_dir) # Find the written file @@ -113,7 +113,7 @@ def test_multiple_fixtures_match_json_dumps( collector.add_fixture(info, fixture) fixtures_and_infos.append((info, fixture)) - collector.dump_fixtures(worker_id="gw0") + collector.dump_fixtures() merge_partial_fixture_files(output_dir) json_files = list(output_dir.rglob("*.json")) @@ -140,6 +140,7 @@ def test_multiple_workers_merge_correctly( single_fixture_per_file=False, filler_path=filler_path, generate_index=False, + worker_id="gw0", ) # Worker A writes fixtures 0-2 pairs_a = [] @@ -148,7 +149,7 @@ def test_multiple_workers_merge_correctly( info = _make_info(f"tx_test_{i}", module_path) collector1.add_fixture(info, fixture) pairs_a.append((info, fixture)) - collector1.dump_fixtures(worker_id="gw0") + collector1.close_streaming_files() # Worker B writes fixtures 3-5 (separate partial file) collector2 = FixtureCollector( @@ -157,6 +158,7 @@ def test_multiple_workers_merge_correctly( single_fixture_per_file=False, filler_path=filler_path, generate_index=False, + worker_id="gw1", ) pairs_b = [] for i in range(3, 6): @@ -164,7 +166,7 @@ def test_multiple_workers_merge_correctly( info = _make_info(f"tx_test_{i}", module_path) collector2.add_fixture(info, fixture) pairs_b.append((info, fixture)) - collector2.dump_fixtures(worker_id="gw1") + collector2.close_streaming_files() # Merge at session end merge_partial_fixture_files(output_dir) @@ -197,7 +199,7 @@ def test_output_is_valid_json( info = _make_info(f"tx_test_{i}", module_path) collector.add_fixture(info, fixture) - collector.dump_fixtures(worker_id="gw0") + collector.dump_fixtures() merge_partial_fixture_files(output_dir) json_files = list(output_dir.rglob("*.json")) @@ -223,7 +225,7 @@ def test_fixtures_sorted_by_key( info = _make_info(f"tx_test_{i}", module_path) collector.add_fixture(info, fixture) - collector.dump_fixtures(worker_id="gw0") + collector.dump_fixtures() merge_partial_fixture_files(output_dir) json_files = list(output_dir.rglob("*.json")) @@ -247,7 +249,7 @@ def test_partial_files_cleaned_up_after_merge( fixture = _make_fixture(1) info = _make_info("tx_test", module_path) collector.add_fixture(info, fixture) - collector.dump_fixtures(worker_id="gw0") + collector.dump_fixtures() # Verify partial file exists before merge partial_files = list(output_dir.rglob("*.partial.*.jsonl")) @@ -293,7 +295,7 @@ def test_single_fixture_matches_legacy( generate_index=False, ) collector.add_fixture(info, fixture) - collector.dump_fixtures(worker_id="gw0") + collector.dump_fixtures() merge_partial_fixture_files(new_dir) new_files = list(new_dir.rglob("*.json")) assert len(new_files) == 1 @@ -333,7 +335,7 @@ def test_multiple_fixtures_match_legacy( ) for i, info in enumerate(infos): collector.add_fixture(info, list(fixtures_dict.values())[i]) - collector.dump_fixtures(worker_id="gw0") + collector.dump_fixtures() merge_partial_fixture_files(new_dir) new_files = list(new_dir.rglob("*.json")) assert len(new_files) == 1 @@ -374,11 +376,12 @@ def test_multiple_workers_match_legacy( single_fixture_per_file=False, filler_path=filler_path, generate_index=False, + worker_id=f"gw{worker_idx}", ) start = worker_idx * 2 for i in range(start, start + 2): collector.add_fixture(infos[i], fixture_values[i]) - collector.dump_fixtures(worker_id=f"gw{worker_idx}") + collector.close_streaming_files() merge_partial_fixture_files(new_dir) new_files = list(new_dir.rglob("*.json")) @@ -426,7 +429,7 @@ def test_special_characters_in_keys_match_legacy( ) for i, info in enumerate(infos): collector.add_fixture(info, list(fixtures_dict.values())[i]) - collector.dump_fixtures(worker_id="gw0") + collector.dump_fixtures() merge_partial_fixture_files(new_dir) new_files = list(new_dir.rglob("*.json")) assert len(new_files) == 1