Skip to content

Commit

Permalink
sigstore: refactor, use IdentityToken everywhere (#635)
Browse files Browse the repository at this point in the history
  • Loading branch information
woodruffw authored May 24, 2023
1 parent bb61b51 commit 2593923
Show file tree
Hide file tree
Showing 9 changed files with 395 additions and 245 deletions.
25 changes: 18 additions & 7 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,10 @@ All versions prior to 0.9.0 are untracked.

## [Unreleased]

### Fixed

* Fixed a case where `sigstore verify` would fail to verify an otherwise valid
inclusion proof due to an incorrect timerange check
([#633](https://github.com/sigstore/sigstore-python/pull/633))

### Changed

* `sigstore verify` now performs additional verification of Rekor's inclusion proofs by cross-checking them against signed checkpoints
* `sigstore verify` now performs additional verification of Rekor's inclusion
proofs by cross-checking them against signed checkpoints
([#634](https://github.com/sigstore/sigstore-python/pull/634))

* A cached copy of the trust bundle is now included with the distribution
Expand All @@ -30,12 +25,28 @@ All versions prior to 0.9.0 are untracked.
bundle, rather than falling back to deprecated individual targets
([#626](https://github.com/sigstore/sigstore-python/pull/626))

* API change: the `sigstore.oidc.IdentityToken` API has been stabilized as
a wrapper for OIDC tokens
([#635](https://github.com/sigstore/sigstore-python/pull/635))

* API change: `Signer.sign` now takes a `sigstore.oidc.IdentityToken` for
its `identity` argument, rather than a "raw" OIDC token
([#635](https://github.com/sigstore/sigstore-python/pull/635))

* API change: `Issuer.identity_token` now returns a
`sigstore.oidc.IdentityToken`, rather than a "raw" OIDC token
([#635](https://github.com/sigstore/sigstore-python/pull/635))

* `sigstore verify` is not longer a backwards-compatible alias for
`sigstore verify identity`, as it was during the 1.0 release series
([#642](https://github.com/sigstore/sigstore-python/pull/642))

### Fixed

* Fixed a case where `sigstore verify` would fail to verify an otherwise valid
inclusion proof due to an incorrect timerange check
([#633](https://github.com/sigstore/sigstore-python/pull/633))

* Removed an unnecessary and backwards-incompatible parameter from the
`sigstore.oidc.detect_credential` API
([#641](https://github.com/sigstore/sigstore-python/pull/641))
Expand Down
110 changes: 63 additions & 47 deletions sigstore/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import sys
from pathlib import Path
from textwrap import dedent
from typing import Optional, TextIO, Union, cast
from typing import NoReturn, Optional, TextIO, Union, cast

from cryptography.x509 import load_pem_x509_certificates
from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import Bundle
Expand All @@ -41,6 +41,7 @@
from sigstore.oidc import (
DEFAULT_OAUTH_ISSUER_URL,
STAGING_OAUTH_ISSUER_URL,
IdentityToken,
Issuer,
detect_credential,
)
Expand All @@ -64,6 +65,15 @@
package_logger.setLevel(os.environ.get("SIGSTORE_LOGLEVEL", "INFO").upper())


def _die(args: argparse.Namespace, message: str) -> NoReturn:
"""
An `argparse` helper that fixes up the type hints on our use of
`ArgumentParser.error`.
"""
args._parser.error(message)
raise ValueError("unreachable")


def _boolify_env(envvar: str) -> bool:
"""
An `argparse` helper for turning an environment variable into a boolean.
Expand Down Expand Up @@ -525,14 +535,14 @@ def main() -> None:
elif args.verify_subcommand == "github":
_verify_github(args)
elif args.subcommand == "get-identity-token":
token = _get_identity_token(args)
if token:
print(token)
identity = _get_identity(args)
if identity:
print(identity)
else:
args._parser.error("No identity token supplied or detected!")
_die(args, "No identity token supplied or detected!")

else:
parser.error(f"Unknown subcommand: {args.subcommand}")
_die(args, f"Unknown subcommand: {args.subcommand}")
except Error as e:
e.print_and_exit(args.verbose >= 1)

Expand All @@ -545,34 +555,34 @@ def _sign(args: argparse.Namespace) -> None:
# `--no-default-files` has no effect on `--bundle`, but we forbid it because
# it indicates user confusion.
if args.no_default_files and has_bundle:
args._parser.error("--no-default-files may not be combined with --bundle.")
_die(args, "--no-default-files may not be combined with --bundle.")

# Fail if `--signature` or `--certificate` is specified *and* we have more
# than one input.
if (has_sig or has_crt or has_bundle) and len(args.files) > 1:
args._parser.error(
_die(
args,
"Error: --signature, --certificate, and --bundle can't be used with "
"explicit outputs for multiple inputs.",
)

if args.output_directory and (has_sig or has_crt or has_bundle):
args._parser.error(
_die(
args,
"Error: --signature, --certificate, and --bundle can't be used with "
"an explicit output directory.",
)

# Fail if either `--signature` or `--certificate` is specified, but not both.
if has_sig ^ has_crt:
args._parser.error(
"Error: --signature and --certificate must be used together."
)
_die(args, "Error: --signature and --certificate must be used together.")

# Build up the map of inputs -> outputs ahead of any signing operations,
# so that we can fail early if overwriting without `--overwrite`.
output_map = {}
for file in args.files:
if not file.is_file():
args._parser.error(f"Input must be a file: {file}")
_die(args, f"Input must be a file: {file}")

sig, cert, bundle = (
args.signature,
Expand All @@ -582,9 +592,7 @@ def _sign(args: argparse.Namespace) -> None:

output_dir = args.output_directory if args.output_directory else file.parent
if output_dir.exists() and not output_dir.is_dir():
args._parser.error(
f"Output directory exists and is not a directory: {output_dir}"
)
_die(args, f"Output directory exists and is not a directory: {output_dir}")
output_dir.mkdir(parents=True, exist_ok=True)

if not bundle and not args.no_default_files:
Expand All @@ -600,9 +608,10 @@ def _sign(args: argparse.Namespace) -> None:
extants.append(str(bundle))

if extants:
args._parser.error(
_die(
args,
"Refusing to overwrite outputs without --overwrite: "
f"{', '.join(extants)}"
f"{', '.join(extants)}",
)

output_map[file] = {
Expand Down Expand Up @@ -638,22 +647,26 @@ def _sign(args: argparse.Namespace) -> None:
rekor=RekorClient(args.rekor_url, rekor_keyring, ct_keyring),
)

# The order of precedence is as follows:
# The order of precedence for identities is as follows:
#
# 1) Explicitly supplied identity token
# 2) Ambient credential detected in the environment, unless disabled
# 3) Interactive OAuth flow
if not args.identity_token:
args.identity_token = _get_identity_token(args)
if not args.identity_token:
args._parser.error("No identity token supplied or detected!")
identity: IdentityToken | None
if args.identity_token:
identity = IdentityToken(args.identity_token)
else:
identity = _get_identity(args)

if not identity:
_die(args, "No identity token supplied or detected!")

for file, outputs in output_map.items():
logger.debug(f"signing for {file.name}")
with file.open(mode="rb", buffering=0) as io:
result = signer.sign(
input_=io,
identity_token=args.identity_token,
identity=identity,
)

print("Using ephemeral certificate:")
Expand Down Expand Up @@ -696,21 +709,22 @@ def _collect_verification_state(
# Fail if --certificate, --signature, or --bundle is specified and we
# have more than one input.
if (args.certificate or args.signature or args.bundle) and len(args.files) > 1:
args._parser.error(
_die(
args,
"--certificate, --signature, or --bundle can only be used "
"with a single input file"
"with a single input file",
)

# Fail if `--certificate` or `--signature` is used with `--bundle`.
if args.bundle and (args.certificate or args.signature):
args._parser.error("--bundle cannot be used with --certificate or --signature")
_die(args, "--bundle cannot be used with --certificate or --signature")

# The converse of `sign`: we build up an expected input map and check
# that we have everything so that we can fail early.
input_map = {}
for file in args.files:
if not file.is_file():
args._parser.error(f"Input must be a file: {file}")
_die(args, f"Input must be a file: {file}")

sig, cert, bundle = (
args.signature,
Expand Down Expand Up @@ -740,8 +754,9 @@ def _collect_verification_state(
input_map[file] = {"bundle": bundle}

if missing:
args._parser.error(
f"Missing verification materials for {(file)}: {', '.join(missing)}"
_die(
args,
f"Missing verification materials for {(file)}: {', '.join(missing)}",
)

if args.staging:
Expand All @@ -751,16 +766,14 @@ def _collect_verification_state(
verifier = Verifier.production()
else:
if not args.certificate_chain:
args._parser.error(
"Custom Rekor URL used without specifying --certificate-chain"
)
_die(args, "Custom Rekor URL used without specifying --certificate-chain")

try:
certificate_chain = load_pem_x509_certificates(
args.certificate_chain.read()
)
except ValueError as error:
args._parser.error(f"Invalid certificate chain: {error}")
_die(args, f"Invalid certificate chain: {error}")

if args.rekor_root_pubkey is not None:
rekor_keys = [args.rekor_root_pubkey.read()]
Expand Down Expand Up @@ -931,24 +944,27 @@ def _verify_github(args: argparse.Namespace) -> None:
raise VerificationError(cast(VerificationFailure, result))


def _get_identity_token(args: argparse.Namespace) -> Optional[str]:
def _get_identity(args: argparse.Namespace) -> Optional[IdentityToken]:
token = None
if not args.oidc_disable_ambient_providers:
token = detect_credential()

if not token:
if args.staging:
issuer = Issuer.staging()
elif args.oidc_issuer == DEFAULT_OAUTH_ISSUER_URL:
issuer = Issuer.production()
else:
issuer = Issuer(args.oidc_issuer)
# Happy path: we've detected an ambient credential, so we can return early.
if token:
return IdentityToken(token)

if args.oidc_client_secret is None:
args.oidc_client_secret = "" # nosec: B105
if args.staging:
issuer = Issuer.staging()
elif args.oidc_issuer == DEFAULT_OAUTH_ISSUER_URL:
issuer = Issuer.production()
else:
issuer = Issuer(args.oidc_issuer)

token = issuer.identity_token(
client_id=args.oidc_client_id, client_secret=args.oidc_client_secret
)
if args.oidc_client_secret is None:
args.oidc_client_secret = "" # nosec: B105

token = issuer.identity_token(
client_id=args.oidc_client_id, client_secret=args.oidc_client_secret
)

return token
5 changes: 3 additions & 2 deletions sigstore/_internal/fulcio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from pydantic import BaseModel, Field, validator

from sigstore._utils import B64Str
from sigstore.oidc import IdentityToken

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -208,14 +209,14 @@ class FulcioSigningCert(_Endpoint):
"""

def post(
self, req: CertificateSigningRequest, token: str
self, req: CertificateSigningRequest, identity: IdentityToken
) -> FulcioCertificateSigningResponse:
"""
Get the signing certificate, using an X.509 Certificate
Signing Request.
"""
headers = {
"Authorization": f"Bearer {token}",
"Authorization": f"Bearer {identity}",
"Content-Type": "application/json",
"Accept": "application/pem-certificate-chain",
}
Expand Down
56 changes: 1 addition & 55 deletions sigstore/_internal/oidc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,59 +13,5 @@
# limitations under the License.

"""
OIDC functionality for sigstore-python.
Internal OIDC and OAuth functionality for sigstore-python.
"""

import jwt
from id import IdentityError

# See: https://github.com/sigstore/fulcio/blob/b2186c0/pkg/config/config.go#L182-L201
_KNOWN_OIDC_ISSUERS = {
"https://accounts.google.com": "email",
"https://oauth2.sigstore.dev/auth": "email",
"https://oauth2.sigstage.dev/auth": "email",
"https://token.actions.githubusercontent.com": "sub",
}
DEFAULT_AUDIENCE = "sigstore"


class Identity:
"""
A wrapper for an OIDC "identity", as extracted from an OIDC token.
"""

def __init__(self, identity_token: str) -> None:
"""
Create a new `Identity` from the given OIDC token.
"""
identity_jwt = jwt.decode(identity_token, options={"verify_signature": False})

self.issuer = identity_jwt.get("iss")
if self.issuer is None:
raise IdentityError("Identity token missing the required `iss` claim")

if "aud" not in identity_jwt:
raise IdentityError("Identity token missing the required `aud` claim")

aud = identity_jwt.get("aud")

if aud != DEFAULT_AUDIENCE:
raise IdentityError(f"Audience should be {DEFAULT_AUDIENCE!r}, not {aud!r}")

# When verifying the private key possession proof, Fulcio uses
# different claims depending on the token's issuer.
# We currently special-case a handful of these, and fall back
# on signing the "sub" claim otherwise.
proof_claim = _KNOWN_OIDC_ISSUERS.get(self.issuer)
if proof_claim is not None:
if proof_claim not in identity_jwt:
raise IdentityError(
f"Identity token missing the required `{proof_claim!r}` claim"
)

self.proof = str(identity_jwt.get(proof_claim))
else:
try:
self.proof = str(identity_jwt["sub"])
except KeyError:
raise IdentityError("Identity token missing `sub` claim")
Loading

0 comments on commit 2593923

Please sign in to comment.