diff --git a/CHANGELOG.md b/CHANGELOG.md index c81b18a3..4ef34069 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ All versions prior to 1.0.0 are untracked. ### Added -Added the `digest` subcommand to compute and print a model's digest. This enables other tools to easily pair the attestations with a model directory. +- Added support for signing with AWS KMS through KMS URIs. Install with `pip install model-signing[kms]` to enable this functionality. ### Changed - Standardized CLI flags to use hyphens (e.g., `--trust-config` instead of `--trust_config`). Underscore variants are still accepted for backwards compatibility via token normalization. diff --git a/Containerfile b/Containerfile index 724a3cc5..c3d7ad9a 100644 --- a/Containerfile +++ b/Containerfile @@ -38,7 +38,7 @@ COPY src /app/src COPY pyproject.toml /app/ COPY README.md /app/ COPY LICENSE /app/ -RUN pip install .[pkcs11,otel] +RUN pip install .[pkcs11,otel,kms] FROM base AS minimal_image COPY --from=minimal_install /usr/local/bin /usr/local/bin diff --git a/README.md b/README.md index f1417a21..a6828db3 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,8 @@ We support generating signatures via [Sigstore](https://www.sigstore.dev/), a tool for making code signatures transparent without requiring management of cryptographic key material. But we also support traditional signing methods, so models can be signed with public keys or signing certificates as well as -PKCS #11 enabled devices *(install with `pip install model-signing[pkcs11]` to enable this functionality)*. +PKCS #11 enabled devices *(install with `pip install model-signing[pkcs11]` to enable this functionality)* +and AWS KMS *(install with `pip install model-signing[kms]` to enable this functionality)*. The signing part creates a [sigstore bundle](https://github.com/sigstore/protobuf-specs/blob/main/protos/sigstore_bundle.proto) @@ -255,6 +256,45 @@ the PKCS #11 device and store it in a file in PEM format. With can then use: --public-key key.pub /path/to/your/model ``` +#### Signing with AWS KMS + +Signing with AWS Key Management Service (KMS) is supported through KMS URIs. +The URI specifies the key location in AWS KMS. + +The URI format is: `kms://aws/?region=` + +- The key-id-or-arn can be either: + - A simple key ID (e.g., `f26f2baa-8865-459d-a275-8fca1d15119f`) + - A full key ARN (e.g., `arn:aws:kms:us-east-1:123456789012:key/f26f2baa-8865-459d-a275-8fca1d15119f`) +- Region is optional and defaults to the configured AWS region +- The signing key must be of type NIST P256/384/521 (secp256r1/secp384r1/secp521r1) + +With a KMS URI, we can use the following for signing: + +```bash +[...]$ model_signing sign kms-key --signature model.sig \ + --kms-uri "kms://aws/arn:aws:kms:us-east-1:123456789012:key/12345678-1234-1234-1234-123456789012" \ + /path/to/your/model +``` + +For signature verification, retrieve the public key from AWS KMS and store it +in a file in PEM format. Then use: + +```bash +[...]$ model_signing verify key --signature model.sig \ + --public_key key.pub /path/to/your/model +``` + +To install AWS KMS support: + +```bash +[...]$ pip install model-signing[kms] +# or just: pip install boto3 +``` + +For additional KMS providers (GCP, Azure, etc.), please open an issue at: +https://github.com/sigstore/model-transparency/issues + #### OpenTelemetry Support Model signing supports optional distributed tracing and observability through OpenTelemetry. This allows you to monitor signing operations, track performance, and integrate with observability platforms. diff --git a/pyproject.toml b/pyproject.toml index 04f913c9..1e2d0e78 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,6 +52,9 @@ keywords = [ pkcs11 = [ "PyKCS11", ] +kms = [ + "boto3", +] otel = [ "opentelemetry-api", "opentelemetry-sdk", @@ -85,6 +88,7 @@ randomize = true extra-args = ["-m", "not integration"] features = [ "pkcs11", + "kms", ] [[tool.hatch.envs.hatch-test.matrix]] @@ -135,6 +139,7 @@ extra-dependencies = [ ] features = [ "pkcs11", + "kms", ] installer = "pip" python = "3.12" diff --git a/src/model_signing/_cli.py b/src/model_signing/_cli.py index 94da05be..0ba2804d 100644 --- a/src/model_signing/_cli.py +++ b/src/model_signing/_cli.py @@ -180,6 +180,7 @@ class _PKICmdGroup(click.Group): "certificate", "pkcs11-key", "pkcs11-certificate", + "kms-key", ] def get_command( @@ -546,6 +547,68 @@ def _sign_pkcs11_key( click.echo("Signing succeeded") +# Decorator for the commonly used option to set a KMS URI +_kms_uri_option = click.option( + "--kms-uri", + type=str, + metavar="KMS_URI", + required=True, + help=( + "AWS KMS URI of the signing key (e.g., kms://aws/key-id, " + "kms://aws/arn:aws:kms:region:account-id:key/key-id?region=us-east-1)." + ), +) + + +@_sign.command(name="kms-key") +@_model_path_argument +@_ignore_paths_option +@_ignore_git_paths_option +@_allow_symlinks_option +@_write_signature_option +@_kms_uri_option +def _sign_kms_key( + model_path: pathlib.Path, + ignore_paths: Iterable[pathlib.Path], + ignore_git_paths: bool, + allow_symlinks: bool, + signature: pathlib.Path, + kms_uri: str, +) -> None: + """Sign using a private key stored in AWS KMS. + + Signing the model at MODEL_PATH, produces the signature at SIGNATURE_PATH + (as per `--signature` option). Files in IGNORE_PATHS are not part of the + signature. + + Pass the AWS KMS URI using `--kms-uri`. The URI format is: + kms://aws/?region= + + For additional KMS providers, please open an issue at: + https://github.com/sigstore/model-transparency/issues + + Note that this method does not provide a way to tie to the identity of the + signer, outside of pairing the keys. Also note that we don't offer key + management protocols. + """ + try: + ignored = _resolve_ignore_paths( + model_path, list(ignore_paths) + [signature] + ) + model_signing.signing.Config().use_kms_signer( + kms_uri=kms_uri + ).set_hashing_config( + model_signing.hashing.Config() + .set_ignored_paths(paths=ignored, ignore_git_paths=ignore_git_paths) + .set_allow_symlinks(allow_symlinks) + ).sign(model_path, signature) + except Exception as err: + click.echo(f"Signing failed with error: {err}", err=True) + sys.exit(1) + + click.echo("Signing succeeded") + + @_sign.command(name="certificate") @_model_path_argument @_ignore_paths_option diff --git a/src/model_signing/_signing/sign_kms.py b/src/model_signing/_signing/sign_kms.py new file mode 100644 index 00000000..fd83e42d --- /dev/null +++ b/src/model_signing/_signing/sign_kms.py @@ -0,0 +1,209 @@ +# Copyright 2025 The Sigstore Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import hashlib +from urllib.parse import parse_qs +from urllib.parse import urlparse + +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import ec +from google.protobuf import json_format +from sigstore_models import intoto as intoto_pb +from sigstore_models.bundle import v1 as bundle_pb +from sigstore_models.common import v1 as common_pb +from typing_extensions import override + +from model_signing._signing import sign_ec_key as ec_key +from model_signing._signing import sign_sigstore_pb as sigstore_pb +from model_signing._signing import signing +from model_signing._signing.sign_ec_key import _check_supported_ec_key + + +class AWSKMSBackend: + """AWS KMS backend for signing with keys stored in AWS KMS.""" + + def __init__(self, key_id: str, region: str | None = None): + """Initializes the AWS KMS backend. + + Args: + key_id: The AWS KMS key ID or ARN. + region: Optional AWS region. If not provided, uses default region. + """ + try: + import boto3 + except ImportError as e: + raise RuntimeError( + "AWS KMS support requires 'boto3'. " + "Install with 'pip install boto3'." + ) from e + + self._key_id = key_id + self._kms_client = boto3.client("kms", region_name=region) + self._public_key = self._get_public_key() + + def _get_public_key(self) -> ec.EllipticCurvePublicKey: + response = self._kms_client.get_public_key(KeyId=self._key_id) + public_key_der = response["PublicKey"] + public_key = serialization.load_der_public_key(public_key_der) + _check_supported_ec_key(public_key) + return public_key + + def sign(self, digest: bytes) -> bytes: + response = self._kms_client.sign( + KeyId=self._key_id, + Message=digest, + MessageType="DIGEST", + SigningAlgorithm=( + "ECDSA_SHA_256" + if self._public_key.curve.name == "secp256r1" + else "ECDSA_SHA_384" + if self._public_key.curve.name == "secp384r1" + else "ECDSA_SHA_512" + ), + ) + sig_bytes = response["Signature"] + return sig_bytes + + def get_public_key(self) -> ec.EllipticCurvePublicKey: + return self._public_key + + +def _parse_kms_uri(kms_uri: str) -> tuple[str, dict[str, str]]: + """Parses a KMS URI into provider and parameters. + + Supported formats: + - kms://aws/?region= + + For additional KMS providers (GCP, Azure, etc.), please open an issue at: + https://github.com/sigstore/model-transparency/issues + + Args: + kms_uri: The KMS URI to parse. + + Returns: + A tuple of (provider, parameters dict). + """ + parsed = urlparse(kms_uri) + if parsed.scheme != "kms": + raise ValueError(f"Invalid KMS URI scheme: {parsed.scheme}") + + provider = parsed.netloc + path_parts = [p for p in parsed.path.split("/") if p] + query_params = parse_qs(parsed.query) + + params = {} + if provider == "aws": + if len(path_parts) == 0: + raise ValueError( + "AWS KMS URI must have format: kms://aws/" + ) + key_id = "/".join(path_parts) + if key_id.startswith("arn:aws:kms:"): + arn_parts = key_id.split(":") + if len(arn_parts) != 6 or arn_parts[5].split("/")[0] != "key": + raise ValueError( + "AWS KMS ARN must have format: " + "arn:aws:kms:::key/" + ) + elif "/" in key_id: + raise ValueError( + "AWS KMS URI must be either a full ARN " + "(arn:aws:kms:...) or a simple key ID" + ) + params["key_id"] = key_id + if "region" in query_params: + params["region"] = query_params["region"][0] + else: + raise ValueError( + f"Unsupported KMS provider: {provider}. " + "Currently only AWS KMS is supported. " + "For other providers, please open an issue at: " + "https://github.com/sigstore/model-transparency/issues" + ) + + return provider, params + + +class Signer(sigstore_pb.Signer): + """Signer using KMS URIs with elliptic curve keys.""" + + def __init__(self, kms_uri: str): + """Initializes the KMS signer. + + Args: + kms_uri: The KMS URI specifying the provider and key. + """ + provider, params = _parse_kms_uri(kms_uri) + + if provider == "aws": + self._backend = AWSKMSBackend( + params["key_id"], params.get("region") + ) + else: + raise ValueError(f"Unsupported KMS provider: {provider}") + + self._public_key = self._backend.get_public_key() + + def public_key(self): + """Get the python cryptography public key.""" + return self._public_key + + @override + def sign(self, payload: signing.Payload) -> signing.Signature: + raw_payload = json_format.MessageToJson(payload.statement.pb).encode( + "utf-8" + ) + + hash_alg = ec_key.get_ec_key_hash(self._public_key) + pae_payload = sigstore_pb.pae(raw_payload) + + hash_obj = hashes.Hash(hash_alg) + hash_obj.update(pae_payload) + digest = hash_obj.finalize() + + sig = self._backend.sign(digest) + + raw_signature = intoto_pb.Signature(sig=base64.b64encode(sig), keyid="") + + envelope = intoto_pb.Envelope( + payload=base64.b64encode(raw_payload), + payload_type=signing._IN_TOTO_JSON_PAYLOAD_TYPE, + signatures=[raw_signature], + ) + + return sigstore_pb.Signature( + bundle_pb.Bundle( + media_type=sigstore_pb._BUNDLE_MEDIA_TYPE, + verification_material=self._get_verification_material(), + dsse_envelope=envelope, + ) + ) + + def _get_verification_material(self) -> bundle_pb.VerificationMaterial: + """Returns the verification material to include in the bundle.""" + public_key = self._public_key + + raw_bytes = public_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + + hash_bytes = hashlib.sha256(raw_bytes).digest().hex() + + return bundle_pb.VerificationMaterial( + public_key=common_pb.PublicKeyIdentifier(hint=hash_bytes), + tlog_entries=[], + ) diff --git a/src/model_signing/_signing/sign_pkcs11.py b/src/model_signing/_signing/sign_pkcs11.py index b4213251..ab887971 100644 --- a/src/model_signing/_signing/sign_pkcs11.py +++ b/src/model_signing/_signing/sign_pkcs11.py @@ -24,7 +24,6 @@ from cryptography import x509 from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.asymmetric import ec from google.protobuf import json_format import PyKCS11 from sigstore_models import intoto as intoto_pb @@ -36,6 +35,7 @@ from model_signing._signing import sign_sigstore_pb as sigstore_pb from model_signing._signing import signing from model_signing._signing.pkcs11uri import Pkcs11URI +from model_signing._signing.sign_ec_key import _check_supported_ec_key MODULE_PATHS: Iterable[str] = [ @@ -44,24 +44,6 @@ ] -def _check_supported_ec_key(public_key: ec.EllipticCurvePublicKey): - """Checks if the elliptic curve key is supported by our package. - - We only support a family of curves, trying to match those specified by - Sigstore's protobuf specs. - See https://github.com/sigstore/model-transparency/issues/385. - - Args: - public_key: The public key to check. Can be obtained from a private key. - - Raises: - ValueError: The key is not supported. - """ - curve = public_key.curve.name - if curve not in ["secp256r1", "secp384r1", "secp521r1"]: - raise ValueError(f"Unsupported key for curve '{curve}'") - - def encode_ec_public_key(public_key: PyKCS11.CK_OBJECT_HANDLE) -> PublicKeyInfo: ec_params, ec_point = public_key.session.getAttributeValue( public_key, [PyKCS11.CKA_EC_PARAMS, PyKCS11.CKA_EC_POINT] diff --git a/src/model_signing/signing.py b/src/model_signing/signing.py index 4de79be2..be06c19a 100644 --- a/src/model_signing/signing.py +++ b/src/model_signing/signing.py @@ -295,3 +295,31 @@ def use_pkcs11_certificate_signer( module_paths=module_paths, ) return self + + def use_kms_signer(self, *, kms_uri: str) -> Self: + """Configures the signing to be performed using AWS KMS. + + The signer in this configuration is changed to one that performs signing + using a key stored in AWS Key Management Service (KMS). + + The KMS URI format is: kms://aws/?region= + + For additional KMS providers, please open an issue at: + https://github.com/sigstore/model-transparency/issues + + Args: + kms_uri: The AWS KMS URI specifying the key. + + Return: + The new signing configuration. + """ + try: + from model_signing._signing import sign_kms as kms_signer + except ImportError as e: + raise RuntimeError( + "AWS KMS functionality requires boto3. " + "Install with 'pip install model-signing[kms]' or " + "'pip install boto3'." + ) from e + self._signer = kms_signer.Signer(kms_uri) + return self diff --git a/tests/_signing/kms_test.py b/tests/_signing/kms_test.py new file mode 100644 index 00000000..5cbb589a --- /dev/null +++ b/tests/_signing/kms_test.py @@ -0,0 +1,64 @@ +# Copyright 2025 The Sigstore Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + + +class TestKMSSigning: + def test_kms_uri_parsing(self): + from model_signing._signing.sign_kms import _parse_kms_uri + + provider, params = _parse_kms_uri("kms://aws/key-id") + assert provider == "aws" + assert params["key_id"] == "key-id" + + provider, params = _parse_kms_uri("kms://aws/key-id?region=us-east-1") + assert provider == "aws" + assert params["key_id"] == "key-id" + assert params["region"] == "us-east-1" + + provider, params = _parse_kms_uri( + "kms://aws/arn:aws:kms:us-east-1:123456789012:key/" + "f26f2baa-8865-459d-a275-8fca1d15119f" + ) + assert provider == "aws" + expected_arn = ( + "arn:aws:kms:us-east-1:123456789012:key/" + "f26f2baa-8865-459d-a275-8fca1d15119f" + ) + assert params["key_id"] == expected_arn + + def test_invalid_kms_uri(self): + from model_signing._signing.sign_kms import _parse_kms_uri + + with pytest.raises(ValueError, match="Invalid KMS URI scheme"): + _parse_kms_uri("invalid://aws/key") + + with pytest.raises(ValueError, match="Unsupported KMS provider"): + _parse_kms_uri("kms://unknown/provider") + + with pytest.raises(ValueError, match="Unsupported KMS provider"): + _parse_kms_uri("kms://gcp/project/location/keyring/key") + + with pytest.raises(ValueError, match="Unsupported KMS provider"): + _parse_kms_uri("kms://azure/vault/key") + + with pytest.raises(ValueError, match="Unsupported KMS provider"): + _parse_kms_uri("kms://file/path/to/key.pem") + + with pytest.raises(ValueError, match="AWS KMS URI must be either"): + _parse_kms_uri("kms://aws/key-id/extra") + + with pytest.raises(ValueError, match="AWS KMS ARN must have format"): + _parse_kms_uri("kms://aws/arn:aws:kms:invalid") diff --git a/tests/api_test.py b/tests/api_test.py index 1195d9f1..a1a34586 100644 --- a/tests/api_test.py +++ b/tests/api_test.py @@ -44,19 +44,6 @@ ] -@pytest.fixture -def base_path() -> Path: - return Path(__file__).parent - - -@pytest.fixture -def populate_tmpdir(tmp_path: Path) -> Path: - Path(tmp_path / "signme-1").write_text("signme-1") - Path(tmp_path / "signme-2").write_text("signme-2") - Path(tmp_path / ".gitignore").write_text(".foo") - return tmp_path - - def get_signed_files(modelsig: Path) -> list[str]: with open(modelsig, "r") as file: signature = json.load(file) @@ -262,6 +249,7 @@ def test_sign_and_verify(self, base_path, populate_tmpdir): assert get_signed_files(signature) == [ ".gitignore", + "ignored", "signme-1", "signme-2", ] @@ -330,6 +318,7 @@ def test_sign_and_verify(self, base_path, populate_tmpdir): assert get_signed_files(signature) == [ ".gitignore", + "ignored", "signme-1", "signme-2", ] @@ -401,6 +390,7 @@ def test_sign_and_verify_sharded(self, base_path, populate_tmpdir): assert get_signed_files(signature) == [ ".gitignore:0:4", + "ignored:0:7", "signme-1:0:8", "signme-2:0:8", ] diff --git a/tests/conftest.py b/tests/conftest.py index 4ac71b3c..d4391400 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -121,3 +121,19 @@ def symlink_model_folder( symlink_file = model_dir / "symlink_file" os.symlink(external_file.absolute(), symlink_file.absolute()) return model_dir + + +@pytest.fixture +def base_path() -> pathlib.Path: + """Base path for tests.""" + return pathlib.Path(__file__).parent + + +@pytest.fixture +def populate_tmpdir(tmp_path: pathlib.Path) -> pathlib.Path: + """Populate a temporary directory with test files.""" + (tmp_path / "signme-1").write_text("signme-1") + (tmp_path / "signme-2").write_text("signme-2") + (tmp_path / ".gitignore").write_text(".foo") + (tmp_path / "ignored").write_text("ignored") + return tmp_path