Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor(perf): write partial JSON fixture files + merge at end
- xdist workers were writing to the same fixture JSON file causing O(n²) work...
  each worker had to read, parse, and rewrite all previous entries.

- now workers write to their own partial JSONL file (append-only, O(1))
  - test_blob_txs.partial.gw0.jsonl
  - test_blob_txs.partial.gw1.jsonl
  - etc.
  .. and at session end, ``merge_partial_fixture_files()`` combines all partials
  into the final JSON file

Test teardown on some tests dropped from ~80s to ~1s
  • Loading branch information
fselmo committed Jan 28, 2026
commit 00a835cf3704d3f7cd529de21a76cd8b1111b7b6
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
PreAllocGroupBuilders,
PreAllocGroups,
TestInfo,
merge_partial_fixture_files,
)
from execution_testing.forks import (
Fork,
Expand Down Expand Up @@ -1240,12 +1241,12 @@ def fixture_collector(
generate_index=request.config.getoption("generate_index"),
)
yield fixture_collector
fixture_collector.dump_fixtures()
worker_id = os.environ.get("PYTEST_XDIST_WORKER", None)
fixture_collector.dump_fixtures(worker_id)
if do_fixture_verification:
fixture_collector.verify_fixture_files(evm_fixture_verification)
# Write partial index for this worker/scope
if fixture_collector.generate_index:
worker_id = os.environ.get("PYTEST_XDIST_WORKER", None)
fixture_collector.write_partial_index(worker_id)


Expand Down Expand Up @@ -1648,6 +1649,9 @@ def pytest_sessionfinish(session: pytest.Session, exitstatus: int) -> None:
if fixture_output.is_stdout or is_help_or_collectonly_mode(session.config):
return

# Merge partial fixture files from all workers into final JSON files
merge_partial_fixture_files(fixture_output.directory)

# Remove any lock files that may have been created.
for file in fixture_output.directory.rglob("*.lock"):
file.unlink()
Expand Down
7 changes: 6 additions & 1 deletion packages/testing/src/execution_testing/fixtures/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
BlockchainFixture,
BlockchainFixtureCommon,
)
from .collector import FixtureCollector, TestInfo
from .collector import (
FixtureCollector,
TestInfo,
merge_partial_fixture_files,
)
from .consume import FixtureConsumer
from .pre_alloc_groups import (
PreAllocGroup,
Expand Down Expand Up @@ -45,4 +49,5 @@
"StateFixture",
"TestInfo",
"TransactionFixture",
"merge_partial_fixture_files",
]
175 changes: 91 additions & 84 deletions packages/testing/src/execution_testing/fixtures/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,70 @@
from .file import Fixtures


def merge_partial_fixture_files(output_dir: Path) -> None:
"""
Merge all partial fixture JSONL files into final JSON fixture files.

Called at session end after all workers have written their partials.
Each partial file contains JSONL lines: {"k": fixture_id, "v": json_str}
"""
# Find all partial files
partial_files = list(output_dir.rglob("*.partial.*.jsonl"))
if not partial_files:
return

# Group partial files by their target fixture file
# e.g., "test.partial.gw0.jsonl" -> "test.json"
partials_by_target: Dict[Path, List[Path]] = {}
for partial in partial_files:
# Remove .partial.{worker_id}.jsonl suffix to get target
name = partial.name
# Find ".partial." and remove everything after
idx = name.find(".partial.")
if idx == -1:
continue
target_name = name[:idx] + ".json"
target_path = partial.parent / target_name
if target_path not in partials_by_target:
partials_by_target[target_path] = []
partials_by_target[target_path].append(partial)

# Merge each group into its target file
for target_path, partials in partials_by_target.items():
entries: Dict[str, str] = {}

# Read all partial files
for partial in partials:
with open(partial) as f:
for line in f:
line = line.strip()
if not line:
continue
entry = json.loads(line)
entries[entry["k"]] = entry["v"]

# Write final JSON file
sorted_keys = sorted(entries.keys())
parts = ["{\n"]
last_idx = len(sorted_keys) - 1
for i, key in enumerate(sorted_keys):
key_json = json.dumps(key)
# Add indentation for nesting inside outer JSON object
value_indented = entries[key].replace("\n", "\n ")
parts.append(f" {key_json}: {value_indented}")
parts.append(",\n" if i < last_idx else "\n")
parts.append("}")
target_path.write_text("".join(parts))

# Clean up partial files
for partial in partials:
partial.unlink()
# Also remove lock files
lock_file = partial.with_suffix(".lock")
if lock_file.exists():
lock_file.unlink()


@dataclass(kw_only=True, slots=True)
class TestInfo:
"""Contains test information from the current node."""
Expand Down Expand Up @@ -211,7 +275,7 @@ def add_fixture(self, info: TestInfo, fixture: BaseFixture) -> Path:

return fixture_path

def dump_fixtures(self) -> None:
def dump_fixtures(self, worker_id: str | None = None) -> None:
"""Dump all collected fixtures to their respective files."""
if self.output_dir.name == "stdout":
combined_fixtures = {
Expand All @@ -228,97 +292,40 @@ def dump_fixtures(self) -> None:
raise TypeError(
"All fixtures in a single file must have the same format."
)
self._write_fixture_file(fixture_path, fixtures)
self._write_partial_fixtures(fixture_path, fixtures, worker_id)

self.all_fixtures.clear()
self._pre_serialized.clear()

def _write_fixture_file(self, file_path: Path, fixtures: Fixtures) -> None:
"""
Write fixtures to file using pre-serialized JSON strings.

Concatenates individually pre-serialized fixture values into a valid
JSON object, avoiding a single large json.dumps() call at teardown.
"""
entries: Dict[str, str] = {}
lock_file_path = file_path.with_suffix(".lock")
with FileLock(lock_file_path):
# Merge with any existing entries on disk (xdist workers writing
# to same file). Extract entries as raw strings to avoid
# re-serializing thousands of fixtures.
if file_path.exists():
entries = self._extract_entries_from_file(file_path)

# Look up pre-serialized strings for current fixtures
for name in fixtures:
if name in self._pre_serialized:
entries[name] = self._pre_serialized[name]
else:
entries[name] = json.dumps(
fixtures[name].json_dict_with_info(), indent=4
)

# Write sorted entries as a JSON object by concatenating strings
sorted_keys = sorted(entries.keys())
with open(file_path, "w") as f:
f.write("{\n")
for i, key in enumerate(sorted_keys):
key_json = json.dumps(key)
value_str = entries[key].replace("\n", "\n ")
f.write(f" {key_json}: {value_str}")
if i < len(sorted_keys) - 1:
f.write(",\n")
else:
f.write("\n")
f.write("}")

def _extract_entries_from_file(self, file_path: Path) -> Dict[str, str]:
def _write_partial_fixtures(
self, file_path: Path, fixtures: Fixtures, worker_id: str | None
) -> None:
"""
Extract fixture entries from an existing file as raw JSON strings.
Write fixtures to a partial JSONL file (append-only).

This avoids re-serializing entries written by other xdist workers,
which would be O(n) json.dumps() calls at teardown time.
Each line is a JSON object: {"key": "fixture_id", "value": "json_str"}
This avoids O(n) merge work per worker - just O(1) append.
Final merge to JSON happens at session end.
"""
content = file_path.read_text()
entries: Dict[str, str] = {}
decoder = json.JSONDecoder()

# Skip opening brace and whitespace
pos = content.find("{") + 1

while pos < len(content):
# Skip whitespace
while pos < len(content) and content[pos] in " \t\n":
pos += 1

if pos >= len(content) or content[pos] == "}":
break

# Decode the key
key, end = decoder.raw_decode(content, pos)
pos = end

# Skip colon and whitespace
while pos < len(content) and content[pos] in " \t\n:":
pos += 1

# Decode the value and capture its string boundaries
value_start = pos
_, end = decoder.raw_decode(content, pos)
value_end = end

# Extract the raw value string and remove outer indentation
raw_value = content[value_start:value_end]
# The file format adds 4-space indent; remove it to get original
entries[key] = raw_value.replace("\n ", "\n")

pos = value_end

# Skip comma and whitespace
while pos < len(content) and content[pos] in " \t\n,":
pos += 1
suffix = f".{worker_id}" if worker_id else ".main"
partial_path = file_path.with_suffix(f".partial{suffix}.jsonl")
partial_path.parent.mkdir(parents=True, exist_ok=True)
lock_file_path = partial_path.with_suffix(".lock")

lines = []
for name in fixtures:
if name in self._pre_serialized:
value = self._pre_serialized[name]
else:
value = json.dumps(
fixtures[name].json_dict_with_info(), indent=4
)
# Store as JSONL: {"k": key, "v": pre-serialized value string}
lines.append(json.dumps({"k": name, "v": value}) + "\n")

return entries
with FileLock(lock_file_path):
with open(partial_path, "a") as f:
f.writelines(lines)

def verify_fixture_files(
self, evm_fixture_verification: FixtureConsumer
Expand Down
Loading
Loading