Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/build-baseline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,10 @@ jobs:
- uses: actions/download-artifact@3e5f45b2cfb9172054b4087a40e8e0b5a5461e7c # v8.0.1
with:
pattern: bandscope-*-${{ github.sha }}
path: artifacts
merge-multiple: true
path: downloaded-artifacts
skip-decompress: true
- name: Extract release artifacts with repo-owned validation
run: python3 scripts/release/extract_release_artifacts.py downloaded-artifacts artifacts
- name: Generate release CycloneDX SBOM
uses: anchore/sbom-action@57aae528053a48a3f6235f2d9461b05fbcb7366d # v0.23.1
with:
Expand Down
142 changes: 137 additions & 5 deletions scripts/checks/verify_supply_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@
"ossf scorecard artifact download must use skip-decompress: true and "
"repo-owned extraction before normalization"
)
RELEASE_DOWNLOAD_DECOMPRESSION_VIOLATION = (
"release artifact download must use skip-decompress: true and "
"repo-owned extraction before asset validation"
)
OSSF_ARTIFACT_EXTRACTOR = "scripts/checks/extract_scorecard_artifact.py"
RELEASE_ARTIFACT_EXTRACTOR = "scripts/release/extract_release_artifacts.py"
OSSF_SARIF_NORMALIZER = "scripts/checks/normalize_scorecard_sarif.py"
OSSF_NORMALIZED_SARIF = "normalized-scorecard-results.sarif"
OSSF_NORMALIZED_SARIF_UPLOAD = f"sarif_file: {OSSF_NORMALIZED_SARIF}"
Expand Down Expand Up @@ -192,6 +197,29 @@ def step_run_command_from_block(step_lines: list[str], step_indent: int) -> str:
return "\n".join(command_lines)


def step_with_value_from_block(
step_lines: list[str], step_indent: int, key: str
) -> str | None:
"""Return a workflow step ``with`` value for ``key`` when scoped under with."""
with_indent: int | None = None
key_pattern = re.compile(rf"^\s*{re.escape(key)}\s*:\s*(?P<value>.*?)\s*$")
for step_line in step_lines:
stripped = step_line.partition("#")[0].rstrip()
if not stripped.strip():
continue
indent = len(step_line) - len(step_line.lstrip(" "))
if with_indent is None:
if indent > step_indent and stripped.strip() == "with:":
with_indent = indent
continue
if indent <= with_indent:
break
match = key_pattern.match(stripped)
if match:
return match.group("value").strip().strip("'\"")
return None


def logical_workflow_lines(content: str) -> list[tuple[int, str]]:
"""Return workflow lines with shell backslash continuations folded."""
logical_lines: list[tuple[int, str]] = []
Expand Down Expand Up @@ -576,13 +604,16 @@ def invokes_scorecard_extractor(command: str) -> bool:
)

violations: list[str] = []
for index, _, step_lines in step_blocks:
for index, block_indent, step_lines in step_blocks:
step_content = "\n".join(line.partition("#")[0] for line in step_lines)
if "actions/download-artifact" not in step_content:
continue
if "ossf-scorecard-results" not in step_content:
continue
if "skip-decompress: true" not in step_content:
if (
step_with_value_from_block(step_lines, block_indent, "skip-decompress")
!= "true"
):
violations.append(OSSF_DOWNLOAD_DECOMPRESSION_VIOLATION)
continue

Expand Down Expand Up @@ -632,6 +663,102 @@ def invokes_scorecard_extractor(command: str) -> bool:
return []


def release_artifact_download_decompression_violations(content: str) -> list[str]:
"""Return release downloads that rely on action-owned ZIP decompression."""
content_without_comments = "\n".join(
line.partition("#")[0] for line in content.splitlines()
)
if "actions/download-artifact" not in content_without_comments:
return []
if "bandscope-*-${{ github.sha }}" not in content_without_comments:
return []

lines = content.splitlines()
step_blocks = workflow_step_blocks(lines)

def invokes_release_extractor(command: str) -> bool:
try:
tokens = shlex.split(command)
except ValueError:
tokens = re.split(r"\s+", command)
cleaned_tokens = [token.strip("'\"") for token in tokens if token.strip("'\"")]
if cleaned_tokens and cleaned_tokens[0] in {">", ">-", "|", "|-"}:
cleaned_tokens = cleaned_tokens[1:]
return (
len(cleaned_tokens) == 4
and cleaned_tokens[0] in {"python", "python3"}
and cleaned_tokens[1] == RELEASE_ARTIFACT_EXTRACTOR
and cleaned_tokens[2] == "downloaded-artifacts"
and cleaned_tokens[3] == "artifacts"
)

def is_blocking_required_step(block_lines: list[str]) -> bool:
step_content = "\n".join(line.partition("#")[0] for line in block_lines)
return not re.search(
r"^\s+continue-on-error\s*:", step_content, flags=re.MULTILINE
) and not re.search(r"^\s+if\s*:", step_content, flags=re.MULTILINE)

violations: list[str] = []
for index, block_indent, step_lines in step_blocks:
step_content = "\n".join(line.partition("#")[0] for line in step_lines)
if "actions/download-artifact" not in step_content:
continue
if "bandscope-*-${{ github.sha }}" not in step_content:
continue
if (
step_with_value_from_block(step_lines, block_indent, "skip-decompress")
!= "true"
):
violations.append(RELEASE_DOWNLOAD_DECOMPRESSION_VIOLATION)
continue

job_content = workflow_job_content_for_step(lines, index)
job_step_blocks = [
block
for block in step_blocks
if workflow_job_content_for_step(lines, block[0]) == job_content
]
later_steps = [
(block_indent, block_lines)
for block_index, block_indent, block_lines in job_step_blocks
if block_index > index
]
extractor_step_position = next(
(
position
for position, (block_indent, block_lines) in enumerate(later_steps)
if invokes_release_extractor(
step_run_command_from_block(block_lines, block_indent)
)
and is_blocking_required_step(block_lines)
),
None,
)
validator_step_position = next(
(
position
for position, (block_indent, block_lines) in enumerate(later_steps)
if (
RELEASE_ASSET_VALIDATOR
in step_run_command_from_block(block_lines, block_indent)
)
),
None,
)
if extractor_step_position is None:
violations.append(RELEASE_DOWNLOAD_DECOMPRESSION_VIOLATION)
continue
if (
validator_step_position is not None
and extractor_step_position > validator_step_position
):
violations.append(RELEASE_DOWNLOAD_DECOMPRESSION_VIOLATION)
continue
if violations:
return [RELEASE_DOWNLOAD_DECOMPRESSION_VIOLATION]
return []


def verify_workflow_coverage() -> list[str]:
"""Return workflow trigger and artifact coverage violations."""
missing: list[str] = []
Expand Down Expand Up @@ -713,6 +840,14 @@ def verify_workflow_coverage() -> list[str]:
missing.append(
"build workflow should not rely on macos-latest for architecture coverage"
)
workflow_paths = sorted(Path(".github/workflows").glob("*.yml")) + sorted(
Path(".github/workflows").glob("*.yaml")
)
for workflow_path in workflow_paths:
workflow_content = workflow_path.read_text(encoding="utf-8")
missing.extend(
release_artifact_download_decompression_violations(workflow_content)
)
scorecard = read_workflow(
Path(".github/workflows/ossf-scorecard.yml"), "ossf scorecard", missing
)
Expand All @@ -735,9 +870,6 @@ def verify_workflow_coverage() -> list[str]:
missing.append(
"ossf scorecard publish_results must use the repository default branch guard"
)
workflow_paths = sorted(Path(".github/workflows").glob("*.yml")) + sorted(
Path(".github/workflows").glob("*.yaml")
)
for workflow_path in workflow_paths:
workflow_content = workflow_path.read_text(encoding="utf-8")
missing.extend(
Expand Down
150 changes: 150 additions & 0 deletions scripts/release/extract_release_artifacts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
"""Safely extract zipped release artifacts downloaded by GitHub Actions."""

from __future__ import annotations

import argparse
import os
import re
import stat
import zipfile
from pathlib import Path
from typing import IO

RELEASE_MEMBER = re.compile(
r"^bandscope-(?:windows|macos)-(?:amd64|arm64)-[0-9a-f]{12}"
r"\.(?:exe|msi|dmg)(?:\.sha256|\.manifest\.txt)?$"
)
MAX_RELEASE_ARTIFACT_BYTES = 512 * 1024 * 1024
MAX_TOTAL_RELEASE_ARTIFACT_BYTES = 4 * 1024 * 1024 * 1024
MAX_RELEASE_ARTIFACT_FILES = 24
READ_CHUNK_BYTES = 64 * 1024


def ensure_non_symlink_path(path: Path, *, path_kind: str = "output path") -> None:
"""Raise when any existing component in ``path`` is a symlink."""
absolute_path = path.absolute()
existing_components = [absolute_path]
existing_components.extend(absolute_path.parents)
for component in reversed(existing_components):
try:
metadata = os.lstat(component)
except FileNotFoundError:
continue
if stat.S_ISLNK(metadata.st_mode):
raise ValueError(f"symlinked {path_kind} is not allowed: {component}")


def artifact_zip_paths(source: Path) -> list[Path]:
"""Return non-symlink ZIP files from an artifact path or directory."""
if source.is_file():
ensure_non_symlink_path(source, path_kind="artifact path")
if source.suffix != ".zip":
raise ValueError(f"expected a release artifact zip: {source}")
return [source]
if not source.is_dir():
raise ValueError(f"artifact source does not exist: {source}")
ensure_non_symlink_path(source, path_kind="artifact path")
candidates: list[Path] = []
for path in sorted(
candidate for candidate in source.iterdir() if candidate.suffix == ".zip"
):
ensure_non_symlink_path(path, path_kind="artifact path")
candidates.append(path)
if not candidates:
raise ValueError(f"expected at least one release artifact zip in {source}")
return candidates


def validate_member(member: zipfile.ZipInfo) -> None:
"""Reject unexpected or unsafe ZIP members."""
member_path = Path(member.filename)
unix_mode = member.external_attr >> 16
if (
RELEASE_MEMBER.fullmatch(member.filename) is None
or member_path.is_absolute()
or ".." in member_path.parts
or member.is_dir()
or stat.S_ISLNK(unix_mode)
):
raise ValueError(f"unexpected release artifact member: {member.filename}")
if member.file_size > MAX_RELEASE_ARTIFACT_BYTES:
raise ValueError(f"release artifact member too large: {member.filename}")


def write_new_file_without_following_symlinks(
target: Path, source_file: IO[bytes]
) -> None:
"""Stream-write to a new file without following an existing symlink."""
flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY
if hasattr(os, "O_NOFOLLOW"):
flags |= os.O_NOFOLLOW
fd = os.open(target, flags, 0o600)
try:
with os.fdopen(fd, "wb") as target_file:
written = 0
while chunk := source_file.read(READ_CHUNK_BYTES):
written += len(chunk)
if written > MAX_RELEASE_ARTIFACT_BYTES:
raise ValueError("release artifact member too large")
target_file.write(chunk)
except Exception:
target.unlink(missing_ok=True)
raise


def extract_release_artifacts(source: Path, output_dir: Path) -> list[Path]:
"""Extract allowlisted release artifact files from downloaded ZIP artifacts."""
ensure_non_symlink_path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
ensure_non_symlink_path(output_dir)

extracted: list[Path] = []
seen: set[str] = set()
total_bytes = 0
for artifact_zip in artifact_zip_paths(source):
with zipfile.ZipFile(artifact_zip) as archive:
members = archive.infolist()
if not members:
raise ValueError(f"empty release artifact zip: {artifact_zip}")
for member in members:
validate_member(member)
if len(seen) >= MAX_RELEASE_ARTIFACT_FILES:
raise ValueError("too many release artifact files")
total_bytes += member.file_size
if total_bytes > MAX_TOTAL_RELEASE_ARTIFACT_BYTES:
raise ValueError("release artifact bundle too large")
if member.filename in seen:
raise ValueError(
f"duplicate release artifact member: {member.filename}"
)
seen.add(member.filename)
target = output_dir / member.filename
with archive.open(member) as source_file:
write_new_file_without_following_symlinks(target, source_file)
extracted.append(target)
return sorted(extracted)


def parse_args() -> argparse.Namespace:
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
description="Safely extract zipped BandScope release artifacts."
)
parser.add_argument(
"source",
type=Path,
help="Artifact ZIP file or directory containing release artifact ZIPs",
)
parser.add_argument("output_dir", type=Path, help="Directory for release artifacts")
return parser.parse_args()


def main() -> None:
"""Run the extractor from the command line."""
args = parse_args()
extracted = extract_release_artifacts(args.source, args.output_dir)
print(f"Extracted {len(extracted)} release artifact files to {args.output_dir}")


if __name__ == "__main__":
main()
Loading
Loading