diff --git a/CHANGES/2237.bugfix b/CHANGES/2237.bugfix new file mode 100644 index 000000000..4781bef71 --- /dev/null +++ b/CHANGES/2237.bugfix @@ -0,0 +1,2 @@ +Added support for PQC signatures. + diff --git a/pulp_container/app/tasks/sign.py b/pulp_container/app/tasks/sign.py index 279559e5a..561f74ecb 100644 --- a/pulp_container/app/tasks/sign.py +++ b/pulp_container/app/tasks/sign.py @@ -119,6 +119,10 @@ async def create_signature(manifest, reference, signing_service): encoded_sig = base64.b64encode(data).decode() sig_digest = hashlib.sha256(data).hexdigest() sig_json = extract_data_from_signature(data, manifest.digest) + if sig_json is None: + raise RuntimeError( + f"Failed to extract signature data for {manifest.digest}" + ) manifest_digest = sig_json["critical"]["image"]["docker-manifest-digest"] signature = ManifestSignature( diff --git a/pulp_container/app/utils.py b/pulp_container/app/utils.py index e28cd6b77..4a5753d87 100644 --- a/pulp_container/app/utils.py +++ b/pulp_container/app/utils.py @@ -1,9 +1,6 @@ import base64 import hashlib import fnmatch -import re -import subprocess -import gnupg import json import logging import time @@ -15,6 +12,7 @@ from functools import partial from rest_framework.exceptions import Throttled +from pulpcore.app.openpgp import packet_iter, subpacket_iter from pulpcore.plugin.models import Artifact, Task from pulpcore.plugin.util import get_domain @@ -32,9 +30,6 @@ SIGNATURE_SCHEMA, ) -KEY_ID_REGEX_COMPILED = re.compile(r"keyid ([0-9A-F]+)") -TIMESTAMP_REGEX_COMPILED = re.compile(r"created ([0-9]+)") - signature_validator = Draft7Validator(SIGNATURE_SCHEMA) log = logging.getLogger(__name__) @@ -86,10 +81,113 @@ def urlpath_sanitize(*args): return "/".join(segments) +_TAG_ONE_PASS_SIG = 4 +_TAG_LITERAL_DATA = 11 +_TAG_SIGNATURE = 2 + + +def _extract_sig_metadata(subpacket_data): + """Extract key_id and timestamp from signature subpacket bytes.""" + result = {} + for sp in subpacket_iter(subpacket_data): + if sp["type"] == 2 and len(sp["body"]) >= 4: + result["timestamp"] = int.from_bytes(sp["body"][:4], "big") + elif sp["type"] == 16 and len(sp["body"]) >= 8: + result["key_id"] = sp["body"][:8].hex().upper() + elif sp["type"] == 33 and len(sp["body"]) >= 5: + result.setdefault("key_id", sp["body"][-8:].hex().upper()) + return result + + +def _extract_inline_sig_data(signature_raw): + """ + Parse an OpenPGP inline-signed message and extract its content and metadata. + + Handles both v4 (RFC 4880) and v6 (RFC 9580 / PQC) packet formats. + + Returns: + tuple: (literal_data_bytes, key_id_hex, timestamp_int) or None on failure. + + """ + literal_data = None + key_id = None + timestamp = None + + try: + for packet in packet_iter(signature_raw): + tag = packet["type"] + body = packet["body"] + + if tag == _TAG_ONE_PASS_SIG: + version = body[0] + if version == 3 and len(body) >= 13: + key_id = body[4:12].hex().upper() + elif version == 6 and len(body) >= 7: + # v6 OPS: type(1) + hash(1) + pubkey(1) + salt_len(1) + salt(N) + # + fpr_len(1) + fingerprint(N) + nested(1) + salt_len = body[4] + fpr_offset = 5 + salt_len + if fpr_offset < len(body): + fpr_len = body[fpr_offset] + fpr = body[fpr_offset + 1 : fpr_offset + 1 + fpr_len] + key_id = fpr[-8:].hex().upper() if len(fpr) >= 8 else None + + elif tag == _TAG_LITERAL_DATA: + fname_len = body[1] + literal_data = body[6 + fname_len :] + + elif tag == _TAG_SIGNATURE: + version = body[0] + if version in (4, 5) and len(body) >= 6: + hashed_len = (body[4] << 8) + body[5] + hashed_data = body[6 : 6 + hashed_len] + unhashed_start = 6 + hashed_len + unhashed_len = (body[unhashed_start] << 8) + body[unhashed_start + 1] + unhashed_data = body[unhashed_start + 2 : unhashed_start + 2 + unhashed_len] + sp = { + **_extract_sig_metadata(unhashed_data), + **_extract_sig_metadata(hashed_data), + } + timestamp = timestamp or sp.get("timestamp") + key_id = key_id or sp.get("key_id") + elif version == 6 and len(body) >= 8: + hashed_len = int.from_bytes(body[4:8], "big") + hashed_data = body[8 : 8 + hashed_len] + unhashed_start = 8 + hashed_len + unhashed_len = int.from_bytes( + body[unhashed_start : unhashed_start + 4], "big" + ) + unhashed_data = body[unhashed_start + 4 : unhashed_start + 4 + unhashed_len] + sp = { + **_extract_sig_metadata(unhashed_data), + **_extract_sig_metadata(hashed_data), + } + timestamp = timestamp or sp.get("timestamp") + key_id = key_id or sp.get("key_id") + except (ValueError, IndexError, NotImplementedError) as exc: + log.info("Failed to parse OpenPGP packets: %s", exc) + return None + + if literal_data is None or key_id is None or timestamp is None: + log.info( + "Incomplete OpenPGP inline-signed message " + "(literal_data=%s, key_id=%s, timestamp=%s)", + literal_data is not None, + key_id, + timestamp, + ) + return None + + return literal_data, key_id, timestamp + + def extract_data_from_signature(signature_raw, man_digest): """ Extract data from an "integrated" signature, aka a signed non-encrypted document. + Parses the OpenPGP inline-signed message directly, supporting both v4 (RFC 4880) + and v6 (RFC 9580) packet formats including post-quantum cryptography signatures. + Args: signature_raw(bytes): A signed doc to get data from man_digest (str): A manifest digest for which the signature is for @@ -98,20 +196,23 @@ def extract_data_from_signature(signature_raw, man_digest): dict: JSON representation of the document and available data about signature """ - gpg = gnupg.GPG() - crypt_obj = gpg.decrypt(signature_raw, extra_args=["--skip-verify"]) - if not crypt_obj.data: + parsed = _extract_inline_sig_data(signature_raw) + if parsed is None: log.info( - "It is not possible to read the signed document, GPG error: {}".format(crypt_obj.stderr) + "It is not possible to read the signed document for %s", + man_digest, ) return + literal_data, key_id, timestamp = parsed + try: - sig_json = json.loads(crypt_obj.data) + sig_json = json.loads(literal_data) except Exception as exc: log.info( - "Signed document cannot be parsed to create a signature for {}." - " Error: {}".format(man_digest, str(exc)) + "Signed document cannot be parsed to create a signature for %s. Error: %s", + man_digest, + exc, ) return @@ -120,15 +221,11 @@ def extract_data_from_signature(signature_raw, man_digest): errors.append(f'{".".join(error.path)}: {error.message}') if errors: - log.info("The signature for {} is not synced due to: {}".format(man_digest, errors)) + log.info("The signature for %s is not synced due to: %s", man_digest, errors) return - # decrypted and unverified signatures do not have prepopulated the key_id and timestamp - # fields; thus, it is necessary to use the debugging utilities of gpg to extract these - # fields since they are not encrypted and still readable without decrypting the signature first - packets = subprocess.check_output(["gpg", "--list-packets"], input=signature_raw).decode() - sig_json["signing_key_id"] = KEY_ID_REGEX_COMPILED.search(packets).group(1) - sig_json["signature_timestamp"] = TIMESTAMP_REGEX_COMPILED.search(packets).group(1) + sig_json["signing_key_id"] = key_id + sig_json["signature_timestamp"] = timestamp return sig_json