diff --git a/.github/actions/build-fixtures/action.yaml b/.github/actions/build-fixtures/action.yaml index a9fec3371cd..f80b0575196 100644 --- a/.github/actions/build-fixtures/action.yaml +++ b/.github/actions/build-fixtures/action.yaml @@ -32,6 +32,9 @@ runs: id: evm-builder with: type: ${{ steps.properties.outputs.evm-type }} + - name: Install pigz for parallel tarball compression + shell: bash + run: sudo apt-get install -y pigz - name: Generate fixtures using fill shell: bash run: | diff --git a/.github/configs/feature.yaml b/.github/configs/feature.yaml index 2c70dfddb77..efd6442ab55 100644 --- a/.github/configs/feature.yaml +++ b/.github/configs/feature.yaml @@ -1,22 +1,22 @@ # Unless filling for special features, all features should fill for previous forks (starting from Frontier) too stable: evm-type: stable - fill-params: --no-html --until=Prague --fill-static-tests --ignore=tests/static/state_tests/stQuadraticComplexityTest + fill-params: --until=Prague --fill-static-tests --ignore=tests/static/state_tests/stQuadraticComplexityTest --no-html --durations=50 develop: evm-type: develop - fill-params: --no-html --until=BPO4 --fill-static-tests --ignore=tests/static/state_tests/stQuadraticComplexityTest + fill-params: --until=BPO4 --fill-static-tests --ignore=tests/static/state_tests/stQuadraticComplexityTest --no-html --durations=50 benchmark: evm-type: benchmark - fill-params: --no-html --fork=Osaka --gas-benchmark-values 1,5,10,30,60,100,150 -m benchmark ./tests/benchmark --maxprocesses=30 --durations=50 + fill-params: --fork=Osaka --gas-benchmark-values 1,5,10,30,60,100,150 -m benchmark ./tests/benchmark --no-html --durations=50 --maxprocesses=30 --dist=worksteal benchmark_fast: evm-type: benchmark - fill-params: --no-html --fork=Osaka --gas-benchmark-values 100 -m "benchmark" ./tests/benchmark + fill-params: --fork=Osaka --gas-benchmark-values 100 -m "benchmark" ./tests/benchmark --no-html --durations=50 feature_only: true bal: evm-type: develop - fill-params: --no-html --fork=Amsterdam --fill-static-tests + fill-params: --fork=Amsterdam --fill-static-tests --no-html --durations=50 feature_only: true diff --git a/packages/testing/src/execution_testing/cli/gen_index.py b/packages/testing/src/execution_testing/cli/gen_index.py index 3a95688d5e5..c8551fc0674 100644 --- a/packages/testing/src/execution_testing/cli/gen_index.py +++ b/packages/testing/src/execution_testing/cli/gen_index.py @@ -234,7 +234,11 @@ def merge_partial_indexes(output_dir: Path, quiet_mode: bool = False) -> None: workers have finished and written their partial indexes. Partial indexes use JSONL format (one JSON object per line) for efficient - append-only writes during fill. Entries are validated with Pydantic here. + append-only writes during fill. + + Memory-optimized: Builds hash trie directly while streaming entries, + avoiding accumulation of all entries in a single list. Writes final + JSON by re-reading partials (2x I/O but ~50% less peak memory). Args: output_dir: The fixture output directory. @@ -247,12 +251,12 @@ def merge_partial_indexes(output_dir: Path, quiet_mode: bool = False) -> None: if not partial_files: raise Exception("No partial indexes found.") - # Merge all partial indexes (JSONL format: one entry per line) - # Read as raw dicts — the data was already validated when collected - # from live Pydantic fixture objects in add_fixture(). - all_raw_entries: list[dict] = [] + # Pass 1: Build hash trie directly while streaming (no intermediate list) + # Only keep what's needed for hash computation: path parts and fixture_hash + root_trie: dict = {} all_forks: set = set() all_formats: set = set() + test_count = 0 for partial_file in partial_files: with open(partial_file) as f: @@ -260,39 +264,89 @@ def merge_partial_indexes(output_dir: Path, quiet_mode: bool = False) -> None: line = line.strip() if not line: continue - entry_data = json.loads(line) - all_raw_entries.append(entry_data) - # Collect forks and formats from raw strings - if entry_data.get("fork"): - all_forks.add(entry_data["fork"]) - if entry_data.get("format"): - all_formats.add(entry_data["format"]) - - # Compute root hash from raw dicts (no Pydantic needed) - root_hash = HashableItem.from_raw_entries(all_raw_entries).hash() - - # Build final index — Pydantic validates the entire structure once - # via model_validate(), not 96k individual model_validate() calls. - index = IndexFile.model_validate( - { - "test_cases": all_raw_entries, - "root_hash": HexNumber(root_hash), - "created_at": datetime.datetime.now(), - "test_count": len(all_raw_entries), - "forks": list(all_forks), - "fixture_formats": list(all_formats), - } - ) + entry = json.loads(line) + test_count += 1 + + # Collect metadata + if entry.get("fork"): + all_forks.add(entry["fork"]) + if entry.get("format"): + all_formats.add(entry["format"]) + + # Insert directly into trie for hash computation + fixture_hash = entry.get("fixture_hash") + if not fixture_hash: + continue + + path_parts = Path(entry["json_path"]).parts + current = root_trie + + # Navigate to parent folder, creating nodes as needed + for part in path_parts[:-1]: + if part not in current: + current[part] = {} + current = current[part] + + # Add test entry to file node + file_name = path_parts[-1] + if file_name not in current: + current[file_name] = [] - # Write final index + hash_bytes = int(fixture_hash, 16).to_bytes(32, "big") + current[file_name].append((entry["id"], hash_bytes)) + + # Compute root hash from trie (reusing hasher's trie_to_hashable logic) + root_hash = _trie_to_hash(root_trie) + + # Free trie memory before pass 2 + del root_trie + + # Pass 2: Stream entries to final JSON file (re-read partials) + # This avoids keeping all entries in memory simultaneously index_path = meta_dir / "index.json" index_path.parent.mkdir(parents=True, exist_ok=True) - index_path.write_text(index.model_dump_json(exclude_none=True, indent=2)) + + with open(index_path, "w") as out_f: + # Write header + out_f.write("{\n") + out_f.write(f' "root_hash": "0x{root_hash.hex()}",\n') + out_f.write( + f' "created_at": "{datetime.datetime.now().isoformat()}",\n' + ) + out_f.write(f' "test_count": {test_count},\n') + out_f.write(f' "forks": {json.dumps(sorted(all_forks))},\n') + out_f.write( + f' "fixture_formats": {json.dumps(sorted(all_formats))},\n' + ) + out_f.write(' "test_cases": [\n') + + # Stream test cases from partials (second read) + first_entry = True + for partial_file in partial_files: + with open(partial_file) as f: + for line in f: + line = line.strip() + if not line: + continue + if not first_entry: + out_f.write(",\n") + first_entry = False + # Write entry with indentation + entry = json.loads(line) + entry_json = json.dumps(entry, indent=2) + # Indent each line of the entry + indented = "\n".join( + " " + ln for ln in entry_json.split("\n") + ) + out_f.write(indented) + + out_f.write("\n ]\n") + out_f.write("}") if not quiet_mode: rich.print( f"[green]Merged {len(partial_files)} partial indexes " - f"({len(all_raw_entries)} test cases) into {index_path}[/]" + f"({test_count} test cases) into {index_path}[/]" ) # Cleanup partial files @@ -300,5 +354,34 @@ def merge_partial_indexes(output_dir: Path, quiet_mode: bool = False) -> None: partial_file.unlink() +def _trie_to_hash(root_trie: dict) -> bytes: + """ + Compute hash from trie structure built during streaming. + + Mirrors HashableItem.from_raw_entries logic but works on pre-built trie. + """ + import hashlib + + def hash_node(node: dict) -> bytes: + """Recursively hash a trie node.""" + hash_parts: list[bytes] = [] + + for name in sorted(node.keys()): + child = node[name] + if isinstance(child, list): + # File node: child is list of (test_id, hash_bytes) + # Hash = sha256(sorted test hashes concatenated) + test_hashes = [h for _, h in sorted(child, key=lambda x: x[0])] + file_hash = hashlib.sha256(b"".join(test_hashes)).digest() + hash_parts.append(file_hash) + else: + # Folder node: recurse + hash_parts.append(hash_node(child)) + + return hashlib.sha256(b"".join(hash_parts)).digest() + + return hash_node(root_trie) + + if __name__ == "__main__": generate_fixtures_index_cli() diff --git a/packages/testing/src/execution_testing/cli/hasher.py b/packages/testing/src/execution_testing/cli/hasher.py index 5bd6a9b8e91..4e3b57da14e 100644 --- a/packages/testing/src/execution_testing/cli/hasher.py +++ b/packages/testing/src/execution_testing/cli/hasher.py @@ -44,11 +44,9 @@ def hash(self) -> bytes: return self.root if self.items is None: raise ValueError("No items to hash") - all_hash_bytes = b"" - for _, item in sorted(self.items.items()): - item_hash_bytes = item.hash() - all_hash_bytes += item_hash_bytes - return hashlib.sha256(all_hash_bytes).digest() + # Use list + join instead of += to avoid O(n²) byte concatenation + hash_parts = [item.hash() for _, item in sorted(self.items.items())] + return hashlib.sha256(b"".join(hash_parts)).digest() def format_lines( self, diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/base.py b/packages/testing/src/execution_testing/cli/pytest_commands/base.py index 5a6835fd6be..47891d56527 100644 --- a/packages/testing/src/execution_testing/cli/pytest_commands/base.py +++ b/packages/testing/src/execution_testing/cli/pytest_commands/base.py @@ -94,6 +94,9 @@ def run_multiple(self, executions: List[PytestExecution]) -> int: f"{execution.description}[/bold blue]" ) self.console.rule(phase_text, style="bold blue") + # Flush for CI visibility (GitHub Actions buffers output) + sys.stdout.flush() + sys.stderr.flush() result = self.run_single(execution) if result != 0: diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/filler/filler.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/filler/filler.py index 7dfef105de2..3559776bcd4 100644 --- a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/filler/filler.py +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/filler/filler.py @@ -9,9 +9,12 @@ import atexit import configparser import datetime +import gc import json import os import signal +import sys +import time import warnings from dataclasses import dataclass, field from pathlib import Path @@ -51,6 +54,10 @@ TestInfo, merge_partial_fixture_files, ) +from execution_testing.fixtures.pre_alloc_groups import ( + _get_worker_id, + merge_partial_group_files, +) from execution_testing.forks import ( Fork, get_transition_fork_predecessor, @@ -402,13 +409,17 @@ def update_pre_alloc_group_builder( self.pre_alloc_group_builders.root[hash_key] = group_builder def save_pre_alloc_groups(self) -> None: - """Save pre-allocation groups to disk.""" + """Save pre-allocation groups to disk as partial files.""" if self.pre_alloc_group_builders is None: return pre_alloc_folder = self.fixture_output.pre_alloc_groups_folder_path pre_alloc_folder.mkdir(parents=True, exist_ok=True) - self.pre_alloc_group_builders.to_folder(pre_alloc_folder) + # Pass worker_id so each worker writes its own partial files + # (no lock contention). Master merges them after all workers finish. + self.pre_alloc_group_builders.to_folder( + pre_alloc_folder, worker_id=_get_worker_id() + ) def calculate_post_state_diff( @@ -901,21 +912,30 @@ def pytest_terminal_summary( session_instance: FillingSession = config.filling_session # type: ignore[attr-defined] if session_instance.phase_manager.is_pre_alloc_generation: # Generate summary stats - pre_alloc_groups: PreAllocGroups + # For xdist, count files and accounts without fully loading groups + # (avoids expensive state_root computation just for summary stats) if config.pluginmanager.hasplugin("xdist"): - # Load pre-allocation groups from disk - pre_alloc_groups = PreAllocGroups.from_folder( - config.fixture_output.pre_alloc_groups_folder_path, # type: ignore[attr-defined] - lazy_load=False, + pre_alloc_folder = ( + config.fixture_output.pre_alloc_groups_folder_path # type: ignore[attr-defined] ) + group_files = list(pre_alloc_folder.glob("*.json")) + total_groups = len(group_files) + # Count accounts by loading as builder (no genesis computation) + total_accounts = 0 + for group_file in group_files: + builder = PreAllocGroupBuilder.model_validate_json( + group_file.read_text() + ) + total_accounts += builder.get_pre_account_count() else: - assert session_instance.pre_alloc_groups is not None - pre_alloc_groups = session_instance.pre_alloc_groups - - total_groups = len(pre_alloc_groups.root) - total_accounts = sum( - group.pre_account_count for group in pre_alloc_groups.values() - ) + assert session_instance.pre_alloc_group_builders is not None + total_groups = len( + session_instance.pre_alloc_group_builders.root + ) + total_accounts = sum( + builder.get_pre_account_count() + for builder in session_instance.pre_alloc_group_builders.root.values() # noqa: E501 + ) terminalreporter.write_sep( "=", @@ -1746,13 +1766,45 @@ def pytest_sessionfinish(session: pytest.Session, exitstatus: int) -> None: - Generate index file for all produced fixtures. - Create tarball of the output directory if the output is a tarball. """ + + def _log_timing(msg: str) -> None: + """Log with timestamp and flush immediately for CI visibility.""" + log_line = f"[sessionfinish] {time.strftime('%H:%M:%S')} {msg}" + # Print to stderr (unbuffered) for immediate CI visibility + print(log_line, file=sys.stderr, flush=True) + + # Log immediately when hook is entered (before any early returns) + is_worker = xdist.is_xdist_worker(session) + _log_timing(f"pytest_sessionfinish ENTERED (worker={is_worker})") + del exitstatus # Save pre-allocation groups after phase 1 fixture_output: FixtureOutput = session.config.fixture_output # type: ignore[attr-defined] session_instance: FillingSession = session.config.filling_session # type: ignore[attr-defined] if session_instance.phase_manager.is_pre_alloc_generation: + _log_timing("Phase 1: saving pre-alloc groups (partial)...") + t0 = time.time() session_instance.save_pre_alloc_groups() + _log_timing( + f"Phase 1: save_pre_alloc_groups done in {time.time() - t0:.1f}s" + ) + + # Master merges all worker partial files after all workers finish + if not is_worker: + _log_timing("Phase 1 (master): merging partial group files...") + t0 = time.time() + pre_alloc_folder = fixture_output.pre_alloc_groups_folder_path + merge_partial_group_files(pre_alloc_folder) + _log_timing( + f"Phase 1 (master): merge done in {time.time() - t0:.1f}s" + ) + else: + # Workers: clear in-memory state to reduce memory pressure while + # waiting for other workers to finish + session_instance.pre_alloc_group_builders = None + gc.collect() + return if session.config.getoption("optimize_gas", False): @@ -1771,21 +1823,44 @@ def pytest_sessionfinish(session: pytest.Session, exitstatus: int) -> None: json.dumps(gas_optimized_tests, indent=2, sort_keys=True) ) - if xdist.is_xdist_worker(session): + if is_worker: + # Workers: clear in-memory state to reduce memory pressure while + # waiting for other workers to finish + session_instance.pre_alloc_groups = None + if hasattr(session.config, "fixture_collector"): + fc = session.config.fixture_collector + fc.all_fixtures.clear() + fc._fixtures_to_verify.clear() + gc.collect() return if fixture_output.is_stdout or is_help_or_collectonly_mode(session.config): return + _log_timing("Finalization (master): starting...") + # Merge partial fixture files from all workers into final JSON files + _log_timing("merge_partial_fixture_files: starting...") + t0 = time.time() merge_partial_fixture_files(fixture_output.directory) + _log_timing( + f"merge_partial_fixture_files: done in {time.time() - t0:.1f}s" + ) # Remove any lock files that may have been created. + _log_timing("Removing lock files...") + t0 = time.time() for file in fixture_output.directory.rglob("*.lock"): file.unlink() + _log_timing(f"Lock files removed in {time.time() - t0:.1f}s") # Verify fixtures after merge if verification is enabled + _log_timing("_verify_fixtures_post_merge: starting...") + t0 = time.time() _verify_fixtures_post_merge(session.config, fixture_output.directory) + _log_timing( + f"_verify_fixtures_post_merge: done in {time.time() - t0:.1f}s" + ) # Generate index file for all produced fixtures by merging partial indexes. # Only merge if partial indexes were actually written (i.e., tests produced @@ -1797,7 +1872,17 @@ def pytest_sessionfinish(session: pytest.Session, exitstatus: int) -> None: ): meta_dir = fixture_output.directory / ".meta" if meta_dir.exists() and any(meta_dir.glob("partial_index*.jsonl")): + _log_timing("merge_partial_indexes: starting...") + t0 = time.time() merge_partial_indexes(fixture_output.directory, quiet_mode=True) + _log_timing( + f"merge_partial_indexes: done in {time.time() - t0:.1f}s" + ) # Create tarball of the output directory if the output is a tarball. + _log_timing("create_tarball: starting...") + t0 = time.time() fixture_output.create_tarball() + _log_timing(f"create_tarball: done in {time.time() - t0:.1f}s") + + _log_timing("Finalization (master): COMPLETE") diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/filler/fixture_output.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/filler/fixture_output.py index 110f0934342..287aa324f1b 100644 --- a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/filler/fixture_output.py +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/filler/fixture_output.py @@ -1,7 +1,9 @@ """Fixture output configuration for generated test fixtures.""" import shutil +import subprocess import tarfile +import warnings from pathlib import Path import pytest @@ -219,11 +221,28 @@ def create_directories(self, is_master: bool) -> None: parents=True, exist_ok=True ) + @staticmethod + def _pigz_available() -> bool: + """Check if pigz (parallel gzip) is available on the system.""" + return shutil.which("pigz") is not None + def create_tarball(self) -> None: - """Create tarball of the output directory if configured to do so.""" + """ + Create tarball of the output directory if configured to do so. + + Automatically uses pigz for parallel compression if available, + otherwise falls back to standard single-threaded gzip. + """ if not self.is_tarball: return + if self._pigz_available(): + self._create_tarball_with_pigz() + else: + self._create_tarball_standard() + + def _create_tarball_standard(self) -> None: + """Create tarball using Python's tarfile module (single-threaded).""" with tarfile.open(self.output_path, "w:gz") as tar: for file in self.directory.rglob("*"): if file.suffix in {".json", ".ini"}: @@ -232,6 +251,43 @@ def create_tarball(self) -> None: ) tar.add(file, arcname=arcname) + def _create_tarball_with_pigz(self) -> None: + """ + Create tarball using Python tarfile + pigz for parallel compression. + + This approach uses Python's tarfile to create the uncompressed .tar + (which correctly handles arcnames across all platforms), then uses + pigz for parallel gzip compression with auto-detected core count. + """ + # Create uncompressed tar first (output_path minus .gz suffix) + temp_tar = self.output_path.with_suffix("") # Remove .gz suffix + + try: + # Use Python tarfile for cross-platform tar creation with arcnames + with tarfile.open(temp_tar, "w") as tar: + for file in self.directory.rglob("*"): + if file.suffix in {".json", ".ini"}: + arcname = Path("fixtures") / file.relative_to( + self.directory + ) + tar.add(file, arcname=arcname) + + # Compress with pigz (parallel gzip, auto-detects available cores) + subprocess.run( + ["pigz", "-f", str(temp_tar)], check=True, capture_output=True + ) + except (subprocess.CalledProcessError, OSError) as e: + # Clean up temp file if it exists + if temp_tar.exists(): + temp_tar.unlink() + # Fall back to standard tarball creation with warning + warnings.warn( + f"pigz compression failed ({type(e).__name__}: {e}), " + "falling back to standard gzip", + stacklevel=2, + ) + self._create_tarball_standard() + @classmethod def from_config(cls, config: pytest.Config) -> "FixtureOutput": """Create a FixtureOutput instance from pytest configuration.""" diff --git a/packages/testing/src/execution_testing/fixtures/collector.py b/packages/testing/src/execution_testing/fixtures/collector.py index 0e1c05da716..3b10c823ee8 100644 --- a/packages/testing/src/execution_testing/fixtures/collector.py +++ b/packages/testing/src/execution_testing/fixtures/collector.py @@ -3,7 +3,6 @@ of generated fixtures. """ -import heapq import json import os import re @@ -14,7 +13,6 @@ IO, ClassVar, Dict, - Generator, List, Literal, Optional, @@ -28,26 +26,6 @@ from .file import Fixtures -def _sorted_entries_from_partial( - partial_path: Path, -) -> Generator[Tuple[str, str], None, None]: - """ - Generator yielding (key, value) pairs from a partial file, sorted by key. - - Loads one partial file into memory at a time (not all partials together). - Each worker's partial file is typically small relative to the total. - """ - entries = [] - with open(partial_path) as f: - for line in f: - line = line.strip() - if line: - entry = json.loads(line) - entries.append((entry["k"], entry["v"])) - entries.sort(key=lambda x: x[0]) - yield from entries - - def merge_partial_fixture_files(output_dir: Path) -> None: """ Merge all partial fixture JSONL files into final JSON fixture files. @@ -55,9 +33,8 @@ def merge_partial_fixture_files(output_dir: Path) -> None: Called at session end after all workers have written their partials. Each partial file contains JSONL lines: {"k": fixture_id, "v": json_str} - Uses k-way merge: each partial file is sorted individually, then merged - using heapq.merge. This keeps memory usage proportional to the largest - single partial file, not the total of all partials. + Processes one target file at a time, reading its partials sequentially + into a dict. Memory = O(entries per target), freed before next target. """ # Find all partial files partial_files = list(output_dir.rglob("*.partial.*.jsonl")) @@ -82,26 +59,31 @@ def merge_partial_fixture_files(output_dir: Path) -> None: # Merge each group into its target file for target_path, partials in partials_by_target.items(): - # K-way merge: sort each partial individually, then merge streams - # Memory = O(largest single partial), not O(sum of all partials) - sorted_iterators = [_sorted_entries_from_partial(p) for p in partials] - merged = heapq.merge(*sorted_iterators, key=lambda x: x[0]) - - # Stream merged entries to output file + # Read partials sequentially into dict (one at a time) + entries: Dict[str, str] = {} + for partial in partials: + with open(partial) as f: + for line in f: + line = line.strip() + if line: + entry = json.loads(line) + entries[entry["k"]] = entry["v"] + + # Write sorted entries to output file with open(target_path, "w") as out_f: out_f.write("{\n") - first = True - for key, value in merged: - if not first: - out_f.write(",\n") - first = False + sorted_keys = sorted(entries.keys()) + last_idx = len(sorted_keys) - 1 + for i, key in enumerate(sorted_keys): key_json = json.dumps(key) - value_indented = value.replace("\n", "\n ") + value_indented = entries[key].replace("\n", "\n ") out_f.write(f" {key_json}: {value_indented}") - if not first: - out_f.write("\n") + out_f.write(",\n" if i < last_idx else "\n") out_f.write("}") + # Free memory before processing next target + entries.clear() + # Clean up partial files for partial in partials: partial.unlink() diff --git a/packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py b/packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py index 41aa1d1150a..6ade0d7666f 100644 --- a/packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py +++ b/packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py @@ -1,6 +1,7 @@ """Pre-allocation group models for test fixture generation.""" import json +import os from dataclasses import dataclass from pathlib import Path from typing import ( @@ -11,12 +12,12 @@ KeysView, List, Literal, + Optional, Self, Tuple, ) -from filelock import FileLock -from pydantic import Field, PrivateAttr +from pydantic import Field, PrivateAttr, ValidationError from execution_testing.base_types import ( CamelModel, @@ -76,44 +77,104 @@ def build(self) -> "PreAllocGroup": genesis=self.calculate_genesis(), ) - def to_file(self, file: Path) -> None: - """Save PreAllocGroup to a file.""" - lock_file_path = file.with_suffix(".lock") - with FileLock(lock_file_path): - if file.exists(): - with open(file, "r") as f: - previous_pre_alloc_group = ( - PreAllocGroup.model_validate_json(f.read()) - ) - for account in previous_pre_alloc_group.pre: - existing_account = previous_pre_alloc_group.pre[account] - if account not in self.pre: - self.pre[account] = existing_account + def to_partial_file( + self, file: Path, worker_id: Optional[str] = None + ) -> None: + """ + Save PreAllocGroupBuilder to a partial file (no locking). + + Each worker writes its own partial file, which are merged at session + end by merge_partial_group_files(). This eliminates lock contention + that caused workers to take 30-180+ seconds each. + + Saves the builder format (without genesis/state_root) to avoid + expensive state root computation during Phase 1. State root is + computed once when loading in Phase 2 via PreAllocGroup.from_file(). + """ + suffix = f".{worker_id}" if worker_id else ".main" + partial_path = file.with_suffix(f".partial{suffix}.json") + partial_path.write_text( + self.model_dump_json(by_alias=True, exclude_none=True, indent=2) + ) + + +def _get_worker_id() -> Optional[str]: + """Get the xdist worker ID from environment, or None if not in xdist.""" + return os.environ.get("PYTEST_XDIST_WORKER") + + +def merge_partial_group_files(folder: Path) -> None: + """ + Merge all partial group files into final group files. + + Called by master process after all workers have finished Phase 1. + Each worker writes {group_hash}.partial.{worker_id}.json files, + which are merged here into {group_hash}.json files. + """ + partial_files = list(folder.glob("*.partial.*.json")) + if not partial_files: + return + + # Group partials by target: {hash}.partial.{worker}.json -> {hash}.json + partials_by_target: Dict[Path, List[Path]] = {} + for partial in partial_files: + name = partial.name + idx = name.find(".partial.") + if idx == -1: + continue + target_name = name[:idx] + ".json" + target_path = partial.parent / target_name + if target_path not in partials_by_target: + partials_by_target[target_path] = [] + partials_by_target[target_path].append(partial) + + # Merge each group's partials + for target_path, partials in partials_by_target.items(): + merged_builder: Optional[PreAllocGroupBuilder] = None + + for partial in partials: + builder = PreAllocGroupBuilder.model_validate_json( + partial.read_text() + ) + + if merged_builder is None: + merged_builder = builder + else: + # Merge pre-allocations (check for collisions) + for account in builder.pre: + new_account = builder.pre[account] + if account not in merged_builder.pre: + merged_builder.pre[account] = new_account else: - new_account = self.pre[account] + existing_account = merged_builder.pre[account] if new_account != existing_account: - # This procedure fails during xdist worker's - # pytest_sessionfinish and is not reported to the - # master thread. We signal here that the groups - # created contain a collision. - collision_file_path = file.with_suffix(".fail") + # Write collision file for error reporting + collision_file_path = target_path.with_suffix( + ".fail" + ) collision_exception = Alloc.CollisionError( address=account, account_1=existing_account, account_2=new_account, ) - with open(collision_file_path, "w") as f: - f.write( - json.dumps(collision_exception.to_json()) - ) + collision_file_path.write_text( + json.dumps(collision_exception.to_json()) + ) raise collision_exception - self.test_ids.extend(previous_pre_alloc_group.test_ids) - with open(file, "w") as f: - f.write( - self.build().model_dump_json( - by_alias=True, exclude_none=True, indent=2 - ) + + # Merge test_ids + merged_builder.test_ids.extend(builder.test_ids) + + # Clean up partial file after processing + partial.unlink() + + # Write final merged file + if merged_builder is not None: + target_path.write_text( + merged_builder.model_dump_json( + by_alias=True, exclude_none=True, indent=2 ) + ) class PreAllocGroupBuilders(EthereumTestRootModel): @@ -128,11 +189,16 @@ class PreAllocGroupBuilders(EthereumTestRootModel): root: Dict[str, PreAllocGroupBuilder] - def to_folder(self, folder: Path) -> None: - """Save PreAllocGroups to a folder of pre-allocation files.""" + def to_folder(self, folder: Path, worker_id: Optional[str] = None) -> None: + """ + Save PreAllocGroups to a folder as partial files. + + Each worker writes its own partial files (no lock contention). + Call merge_partial_group_files() on master after all workers finish. + """ for key, value in self.root.items(): assert value is not None, f"Value for key {key} is None" - value.to_file(folder / f"{key}.json") + value.to_partial_file(folder / f"{key}.json", worker_id=worker_id) def add_test_pre( self, @@ -271,9 +337,28 @@ def model_post_init(self, __context: Any) -> None: @classmethod def from_file(cls, file: Path) -> Self: - """Load a pre-allocation group from a JSON file.""" + """ + Load a pre-allocation group from a JSON file. + + Handles both builder format (without genesis) and full format (with + genesis). If genesis is missing, computes it from the pre-allocation + state. This ensures state root computation happens exactly once when + loading in Phase 2, not during Phase 1 merging. + """ with open(file) as f: - return cls.model_validate_json(f.read()) + data = f.read() + + # Try loading as full PreAllocGroup first (backwards compatibility) + try: + return cls.model_validate_json(data) + except ValidationError: + pass + + # Load as builder format and compute genesis + builder = PreAllocGroupBuilder.model_validate_json(data) + built = builder.build() + # Use cls.model_validate to ensure proper Self return type + return cls.model_validate(built.model_dump()) class PreAllocGroups(EthereumTestRootModel):