Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .github/workflows/ossf-scorecard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,13 @@ jobs:
- uses: actions/download-artifact@3e5f45b2cfb9172054b4087a40e8e0b5a5461e7c # v8.0.1
with:
name: ossf-scorecard-results
path: scorecard-sarif
path: scorecard-artifact
skip-decompress: true
- name: Safely extract Scorecard SARIF artifact
run: >-
python3 scripts/checks/extract_scorecard_artifact.py
scorecard-artifact
scorecard-sarif
- name: Normalize repository-level Scorecard SARIF locations
run: >-
python3 scripts/checks/normalize_scorecard_sarif.py
Expand Down
113 changes: 113 additions & 0 deletions scripts/checks/extract_scorecard_artifact.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""Safely extract the OSSF Scorecard SARIF artifact downloaded as a ZIP."""

from __future__ import annotations

import argparse
import os
import stat
import zipfile
from pathlib import Path

EXPECTED_MEMBER = "results.sarif"


def resolve_artifact_zip(source: Path) -> Path:
"""Return the artifact ZIP file from a file path or single-ZIP directory."""
if source.is_file():
ensure_non_symlink_path(source, path_kind="artifact path")
return source
if not source.is_dir():
raise ValueError(f"artifact source does not exist: {source}")
ensure_non_symlink_path(source, path_kind="artifact path")
candidates: list[Path] = []
for path in sorted(
candidate for candidate in source.iterdir() if candidate.suffix == ".zip"
):
ensure_non_symlink_path(path, path_kind="artifact path")
candidates.append(path)
if len(candidates) != 1:
raise ValueError(
f"expected exactly one Scorecard artifact zip in {source}, found {len(candidates)}"
)
return candidates[0]
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment thread
coderabbitai[bot] marked this conversation as resolved.


def validate_member(member: zipfile.ZipInfo) -> None:
"""Reject unexpected or unsafe ZIP members."""
member_path = Path(member.filename)
unix_mode = member.external_attr >> 16
if (
member.filename != EXPECTED_MEMBER
or member_path.is_absolute()
or ".." in member_path.parts
or member.is_dir()
or stat.S_ISLNK(unix_mode)
):
raise ValueError(f"unexpected artifact member: {member.filename}")


def ensure_non_symlink_path(path: Path, *, path_kind: str = "output path") -> None:
"""Raise when any existing component in ``path`` is a symlink."""
absolute_path = path.absolute()
existing_components = [absolute_path]
existing_components.extend(absolute_path.parents)
for component in reversed(existing_components):
try:
metadata = os.lstat(component)
except FileNotFoundError:
continue
if stat.S_ISLNK(metadata.st_mode):
raise ValueError(f"symlinked {path_kind} is not allowed: {component}")


def write_new_file_without_following_symlinks(target: Path, data: bytes) -> None:
"""Write ``data`` to a new file without following an existing symlink."""
flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY
if hasattr(os, "O_NOFOLLOW"):
flags |= os.O_NOFOLLOW
fd = os.open(target, flags, 0o600)
with os.fdopen(fd, "wb") as target_file:
target_file.write(data)

Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

def extract_scorecard_artifact(source: Path, output_dir: Path) -> Path:
"""Extract exactly ``results.sarif`` into ``output_dir`` and return its path."""
artifact_zip = resolve_artifact_zip(source)
with zipfile.ZipFile(artifact_zip) as archive:
members = archive.infolist()
for member in members:
validate_member(member)
if [member.filename for member in members] != [EXPECTED_MEMBER]:
raise ValueError("expected only results.sarif in Scorecard artifact")
member = members[0]
ensure_non_symlink_path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
ensure_non_symlink_path(output_dir)
target = output_dir / EXPECTED_MEMBER
write_new_file_without_following_symlinks(target, archive.read(member))
return target


def parse_args() -> argparse.Namespace:
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
description="Safely extract a zipped OSSF Scorecard SARIF artifact."
)
parser.add_argument(
"source",
type=Path,
help="Artifact ZIP file or directory containing exactly one artifact ZIP",
)
parser.add_argument("output_dir", type=Path, help="Directory for results.sarif")
return parser.parse_args()


def main() -> None:
"""Run the extractor from the command line."""
args = parse_args()
extracted = extract_scorecard_artifact(args.source, args.output_dir)
print(f"Extracted OSSF Scorecard SARIF to {extracted}")


if __name__ == "__main__":
main()
177 changes: 176 additions & 1 deletion scripts/checks/verify_supply_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@
"ossf scorecard publishing job must only contain uses steps; split run steps "
"into a separate non-publishing job"
)
OSSF_DOWNLOAD_DECOMPRESSION_VIOLATION = (
"ossf scorecard artifact download must use skip-decompress: true and "
"repo-owned extraction before normalization"
)
OSSF_ARTIFACT_EXTRACTOR = "scripts/checks/extract_scorecard_artifact.py"
OSSF_SARIF_NORMALIZER = "scripts/checks/normalize_scorecard_sarif.py"
OSSF_NORMALIZED_SARIF = "normalized-scorecard-results.sarif"
OSSF_NORMALIZED_SARIF_UPLOAD = f"sarif_file: {OSSF_NORMALIZED_SARIF}"
Expand Down Expand Up @@ -390,7 +395,9 @@ def step_run_command(step_lines: list[str], step_indent: int) -> str:
stripped = stripped[2:].strip()
indent = len(step_line) - len(step_line.lstrip(" "))
if run_indent is None:
if stripped.startswith("run:") and (indent > step_indent or is_step_start):
if stripped.startswith("run:") and (
indent > step_indent or is_step_start
):
run_indent = indent
command_lines.append(stripped.partition(":")[2].strip())
continue
Expand Down Expand Up @@ -455,6 +462,19 @@ def workflow_job_step_blocks(line_index: int) -> list[tuple[int, int, list[str]]
if not stripped.startswith("- "):
continue
step_indent = len(line) - len(line.lstrip(" "))
if step_indent < 6:
continue
has_steps_parent = False
for previous_line in reversed(lines[:index]):
previous_stripped = previous_line.strip().partition("#")[0].strip()
previous_indent = len(previous_line) - len(previous_line.lstrip(" "))
if previous_indent >= step_indent:
continue
if previous_stripped == "steps:":
has_steps_parent = True
break
if not has_steps_parent:
continue
step_lines = [line]
for following_line in lines[index + 1 :]:
following_stripped = following_line.strip()
Expand Down Expand Up @@ -517,6 +537,158 @@ def workflow_job_step_blocks(line_index: int) -> list[tuple[int, int, list[str]]
return violations


def scorecard_artifact_download_decompression_violations(content: str) -> list[str]:
"""Return Scorecard downloads that rely on action-owned ZIP decompression."""
content_without_comments = "\n".join(
line.partition("#")[0] for line in content.splitlines()
)
if "actions/download-artifact" not in content_without_comments:
return []
if "ossf-scorecard-results" not in content_without_comments:
return []

lines = content.splitlines()
step_blocks: list[tuple[int, int, list[str]]] = []
for index, line in enumerate(lines):
stripped = line.strip()
if not stripped.startswith("- "):
continue
step_indent = len(line) - len(line.lstrip(" "))
if step_indent < 6:
continue
has_steps_parent = False
for previous_line in reversed(lines[:index]):
previous_stripped = previous_line.strip().partition("#")[0].strip()
previous_indent = len(previous_line) - len(previous_line.lstrip(" "))
if previous_indent >= step_indent:
continue
if previous_stripped == "steps:":
has_steps_parent = True
break
if not has_steps_parent:
continue
step_lines = [line]
for following_line in lines[index + 1 :]:
following_stripped = following_line.strip()
following_indent = len(following_line) - len(following_line.lstrip(" "))
if following_stripped.startswith("- ") and following_indent <= step_indent:
break
step_lines.append(following_line)
step_blocks.append((index, step_indent, step_lines))

def workflow_job_content(line_index: int) -> str:
job_start = 0
for reverse_index in range(line_index, -1, -1):
candidate = lines[reverse_index]
candidate_without_comment = candidate.strip().partition("#")[0].strip()
if len(candidate) - len(
candidate.lstrip(" ")
) == 2 and candidate_without_comment.endswith(":"):
job_start = reverse_index
break
job_end = len(lines)
for forward_index in range(job_start + 1, len(lines)):
candidate = lines[forward_index]
candidate_without_comment = candidate.strip().partition("#")[0].strip()
if len(candidate) - len(
candidate.lstrip(" ")
) == 2 and candidate_without_comment.endswith(":"):
job_end = forward_index
break
return "\n".join(lines[job_start:job_end])

def step_run_command(step_lines: list[str], step_indent: int) -> str:
run_indent: int | None = None
command_lines: list[str] = []
for step_line in step_lines:
raw_stripped = step_line.strip().partition("#")[0].strip()
stripped = raw_stripped
is_step_start = stripped.startswith("- ")
if is_step_start:
stripped = stripped[2:].strip()
indent = len(step_line) - len(step_line.lstrip(" "))
if run_indent is None:
if stripped.startswith("run:") and (
indent > step_indent or is_step_start
):
run_indent = indent
command_lines.append(stripped.partition(":")[2].strip())
continue
if stripped and indent <= run_indent:
break
command_lines.append(stripped)
return "\n".join(command_lines)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

def invokes_scorecard_extractor(command: str) -> bool:
try:
tokens = shlex.split(command)
except ValueError:
tokens = re.split(r"\s+", command)
cleaned_tokens = [token.strip("'\"") for token in tokens if token.strip("'\"")]
if cleaned_tokens and cleaned_tokens[0] in {">", ">-", "|", "|-"}:
cleaned_tokens = cleaned_tokens[1:]
return (
len(cleaned_tokens) == 4
and cleaned_tokens[0] in {"python", "python3"}
and cleaned_tokens[1] == OSSF_ARTIFACT_EXTRACTOR
and cleaned_tokens[2] == "scorecard-artifact"
and cleaned_tokens[3] == "scorecard-sarif"
)

violations: list[str] = []
for index, _, step_lines in step_blocks:
step_content = "\n".join(line.partition("#")[0] for line in step_lines)
if "actions/download-artifact" not in step_content:
continue
if "ossf-scorecard-results" not in step_content:
continue
if "skip-decompress: true" not in step_content:
violations.append(OSSF_DOWNLOAD_DECOMPRESSION_VIOLATION)
continue

job_content = workflow_job_content(index)
job_step_blocks = [
block
for block in step_blocks
if workflow_job_content(block[0]) == job_content
]
later_steps = [
(block_indent, block_lines)
for block_index, block_indent, block_lines in job_step_blocks
if block_index > index
]
extractor_step_position = next(
(
position
for position, (block_indent, block_lines) in enumerate(later_steps)
if invokes_scorecard_extractor(
step_run_command(block_lines, block_indent)
)
),
None,
)
normalizer_step_position = next(
(
position
for position, (block_indent, block_lines) in enumerate(later_steps)
if OSSF_SARIF_NORMALIZER in step_run_command(block_lines, block_indent)
),
None,
)
if extractor_step_position is None:
violations.append(OSSF_DOWNLOAD_DECOMPRESSION_VIOLATION)
continue
if (
normalizer_step_position is not None
and extractor_step_position > normalizer_step_position
):
violations.append(OSSF_DOWNLOAD_DECOMPRESSION_VIOLATION)
continue
if violations:
return [OSSF_DOWNLOAD_DECOMPRESSION_VIOLATION]
return []


def verify_workflow_coverage() -> list[str]:
"""Return workflow trigger and artifact coverage violations."""
missing: list[str] = []
Expand Down Expand Up @@ -628,6 +800,9 @@ def verify_workflow_coverage() -> list[str]:
missing.extend(
scorecard_sarif_upload_normalization_violations(workflow_content)
)
missing.extend(
scorecard_artifact_download_decompression_violations(workflow_content)
)
missing.extend(
ossf_scorecard_publish_restriction_violations(
workflow_content, workflow_path
Expand Down
Loading
Loading