Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sigstore: use our own Statement type #930

Merged
merged 11 commits into from
Mar 13, 2024
2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ dependencies = [
"cryptography >= 42",
"id >= 1.1.0",
"importlib_resources ~= 5.7; python_version < '3.11'",
"in-toto-attestation == 0.9.3",
"pydantic >= 2,< 3",
"pyjwt >= 2.1",
"pyOpenSSL >= 23.0.0",
Expand Down Expand Up @@ -65,7 +64,6 @@ lint = [
# and let Dependabot periodically perform this update.
"ruff < 0.3.2",
"types-requests",
"types-protobuf",
"types-pyOpenSSL",
]
doc = ["pdoc"]
Expand Down
49 changes: 0 additions & 49 deletions sigstore/_internal/dsse.py

This file was deleted.

2 changes: 1 addition & 1 deletion sigstore/_internal/rekor/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def __init__(self, http_error: requests.HTTPError):
"""
Create a new `RekorClientError` from the given `requests.HTTPError`.
"""
if http_error.response:
if http_error.response is not None:
try:
error = rekor_types.Error.model_validate_json(http_error.response.text)
super().__init__(f"{error.code}: {error.message}")
Expand Down
184 changes: 184 additions & 0 deletions sigstore/dsse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
# Copyright 2022 The Sigstore Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Functionality for building and manipulating in-toto Statements and DSSE envelopes.
"""

from __future__ import annotations

import logging
from typing import Any, Literal, Union

from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from pydantic import BaseModel, ConfigDict, Field, RootModel, StrictStr, ValidationError
from sigstore_protobuf_specs.io.intoto import Envelope, Signature

logger = logging.getLogger(__name__)

_Digest = Union[
Literal["sha256"],
Literal["sha384"],
Literal["sha512"],
Literal["sha3_256"],
Literal["sha3_384"],
Literal["sha3_512"],
]
"""
NOTE: in-toto's DigestSet contains all kinds of hash algorithms that
we intentionally do not support. This model is limited to common members of the
SHA-2 and SHA-3 family that are at least as strong as SHA-256.

See: <https://github.com/in-toto/attestation/blob/main/spec/v1/digest_set.md>
"""

_DigestSet = RootModel[dict[_Digest, str]]
"""
An internal validation model for in-toto subject digest sets.
"""


class _Subject(BaseModel):
"""
A single in-toto statement subject.
"""

name: StrictStr | None
digest: _DigestSet = Field(...)


class _Statement(BaseModel):
"""
An internal validation model for in-toto statements.
"""

model_config = ConfigDict(populate_by_name=True)

type_: Literal["https://in-toto.io/Statement/v1"] = Field(..., alias="_type")
subjects: list[_Subject] = Field(..., min_length=1, alias="subject")
predicate_type: StrictStr = Field(..., alias="predicateType")
predicate: dict[str, Any] | None = Field(None, alias="predicate")


class Statement:
"""
Represents an in-toto statement.

This type deals with opaque bytes to ensure that the encoding does not
change, but Statements are internally checked for conformance against
the JSON object layout defined in the in-toto attesation spec.

See: <https://github.com/in-toto/attestation/blob/main/spec/v1/statement.md>
"""

_ENVELOPE_TYPE = "application/vnd.in-toto+json"

def __init__(self, contents: bytes) -> None:
"""
Construct a new Statement.

This takes an opaque `bytes` containing the statement; use
`StatementBuilder` to manually construct an in-toto statement
from constituent pieces.
"""
self._contents = contents
try:
self._statement = _Statement.model_validate_json(contents)
except ValidationError:
raise ValueError("malformed in-toto statement")

def _pae(self) -> bytes:
"""
Construct the PAE encoding for this statement.
"""

# See:
# https://github.com/secure-systems-lab/dsse/blob/v1.0.0/envelope.md
# https://github.com/in-toto/attestation/blob/v1.0/spec/v1.0/envelope.md
pae = f"DSSEv1 {len(Statement._ENVELOPE_TYPE)} {Statement._ENVELOPE_TYPE} ".encode()
pae += b" ".join([str(len(self._contents)).encode(), self._contents])
return pae

def sign(self, key: ec.EllipticCurvePrivateKey) -> Envelope:
"""
Sign the statement, returning a DSSE envelope containing the statement's
signature.
"""

pae = self._pae()
logger.debug(f"DSSE PAE: {pae!r}")

signature = key.sign(pae, ec.ECDSA(hashes.SHA256()))
return Envelope(
payload=self._contents,
payload_type=Statement._ENVELOPE_TYPE,
signatures=[Signature(sig=signature, keyid=None)],
)


class _StatementBuilder:
Copy link
Member Author

@woodruffw woodruffw Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NB: We will probably want to make this API public at some point, but not yet (it needs more design consideration).

"""
A builder-style API for constructing in-toto Statements.
"""

def __init__(
self,
subjects: list[_Subject] | None = None,
predicate_type: str | None = None,
predicate: dict[str, Any] | None = None,
):
"""
Create a new `_StatementBuilder`.
"""
self._subjects = subjects or []
self._predicate_type = predicate_type
self._predicate = predicate

def subjects(self, subjects: list[_Subject]) -> _StatementBuilder:
"""
Configure the subjects for this builder.
"""
self._subjects = subjects
return self

def predicate_type(self, predicate_type: str) -> _StatementBuilder:
"""
Configure the predicate type for this builder.
"""
self._predicate_type = predicate_type
return self

def predicate(self, predicate: dict[str, Any]) -> _StatementBuilder:
"""
Configure the predicate for this builder.
"""
self._predicate = predicate
return self

def build(self) -> Statement:
"""
Build a `Statement` from the builder's state.
"""
try:
stmt = _Statement(
type_="https://in-toto.io/Statement/v1",
subjects=self._subjects,
predicate_type=self._predicate_type,
predicate=self._predicate,
)
except ValidationError as e:
raise ValueError(f"invalid statement: {e}")

return Statement(stmt.model_dump_json(by_alias=True).encode())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A consideration here: technically we make no canonical/serialization guarantees other than "it's valid JSON", but we could do RFC 8785 for good measure here.

9 changes: 4 additions & 5 deletions sigstore/sign.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.x509.oid import NameOID
from in_toto_attestation.v1.statement import Statement
from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import (
Bundle,
VerificationMaterial,
Expand All @@ -71,8 +70,8 @@
)
from sigstore_protobuf_specs.io.intoto import Envelope

from sigstore import dsse
from sigstore import hashes as sigstore_hashes
from sigstore._internal import dsse
from sigstore._internal.fulcio import (
ExpiredCertificate,
FulcioCertificateSigningResponse,
Expand Down Expand Up @@ -174,7 +173,7 @@ def _signing_cert(

def sign(
self,
input_: bytes | Statement | sigstore_hashes.Hashed,
input_: bytes | dsse.Statement | sigstore_hashes.Hashed,
) -> Bundle:
"""
Sign an input, and return a `Bundle` corresponding to the signed result.
Expand Down Expand Up @@ -217,8 +216,8 @@ def sign(
# Sign artifact
content: MessageSignature | Envelope
proposed_entry: rekor_types.Hashedrekord | rekor_types.Dsse
if isinstance(input_, Statement):
content = dsse.sign_intoto(private_key, input_)
if isinstance(input_, dsse.Statement):
content = input_.sign(private_key)

# Create the proposed DSSE entry
proposed_entry = rekor_types.Dsse(
Expand Down
25 changes: 25 additions & 0 deletions test/unit/test_sign.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import sigstore.oidc
from sigstore._internal.keyring import KeyringError, KeyringLookupError
from sigstore._internal.sct import InvalidSCTError, InvalidSCTKeyError
from sigstore.dsse import _StatementBuilder, _Subject
from sigstore.hashes import Hashed
from sigstore.sign import SigningContext
from sigstore.verify.models import VerificationMaterials
Expand Down Expand Up @@ -148,3 +149,27 @@ def test_sign_prehashed(staging):
verifier.verify(input_, materials=materials, policy=UnsafeNoOp())
# verifying against the prehash also works
verifier.verify(hashed, materials=materials, policy=UnsafeNoOp())


@pytest.mark.online
@pytest.mark.ambient_oidc
def test_sign_dsse(staging):
sign_ctx, _, identity = staging

ctx = sign_ctx()
stmt = (
_StatementBuilder()
.subjects(
[_Subject(name="null", digest={"sha256": hashlib.sha256(b"").hexdigest()})]
)
.predicate_type("https://cosign.sigstore.dev/attestation/v1")
.predicate(
{
"Data": "",
"Timestamp": "2023-12-07T00:37:58Z",
}
)
).build()
jku marked this conversation as resolved.
Show resolved Hide resolved

with ctx.signer(identity) as signer:
signer.sign(stmt)
Loading