Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES/2237.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Added support for PQC signatures.

4 changes: 4 additions & 0 deletions pulp_container/app/tasks/sign.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ async def create_signature(manifest, reference, signing_service):
encoded_sig = base64.b64encode(data).decode()
sig_digest = hashlib.sha256(data).hexdigest()
sig_json = extract_data_from_signature(data, manifest.digest)
if sig_json is None:
raise RuntimeError(
f"Failed to extract signature data for {manifest.digest}"
)
manifest_digest = sig_json["critical"]["image"]["docker-manifest-digest"]

signature = ManifestSignature(
Expand Down
137 changes: 117 additions & 20 deletions pulp_container/app/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import base64
import hashlib
import fnmatch
import re
import subprocess
import gnupg
import json
import logging
import time
Expand All @@ -15,6 +12,7 @@
from functools import partial
from rest_framework.exceptions import Throttled

from pulpcore.app.openpgp import packet_iter, subpacket_iter
from pulpcore.plugin.models import Artifact, Task
from pulpcore.plugin.util import get_domain

Expand All @@ -32,9 +30,6 @@
SIGNATURE_SCHEMA,
)

KEY_ID_REGEX_COMPILED = re.compile(r"keyid ([0-9A-F]+)")
TIMESTAMP_REGEX_COMPILED = re.compile(r"created ([0-9]+)")

signature_validator = Draft7Validator(SIGNATURE_SCHEMA)

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -86,10 +81,113 @@ def urlpath_sanitize(*args):
return "/".join(segments)


_TAG_ONE_PASS_SIG = 4
_TAG_LITERAL_DATA = 11
_TAG_SIGNATURE = 2


def _extract_sig_metadata(subpacket_data):
"""Extract key_id and timestamp from signature subpacket bytes."""
result = {}
for sp in subpacket_iter(subpacket_data):
if sp["type"] == 2 and len(sp["body"]) >= 4:
result["timestamp"] = int.from_bytes(sp["body"][:4], "big")
elif sp["type"] == 16 and len(sp["body"]) >= 8:
result["key_id"] = sp["body"][:8].hex().upper()
elif sp["type"] == 33 and len(sp["body"]) >= 5:
result.setdefault("key_id", sp["body"][-8:].hex().upper())
return result


def _extract_inline_sig_data(signature_raw):
"""
Parse an OpenPGP inline-signed message and extract its content and metadata.

Handles both v4 (RFC 4880) and v6 (RFC 9580 / PQC) packet formats.

Returns:
tuple: (literal_data_bytes, key_id_hex, timestamp_int) or None on failure.

"""
literal_data = None
key_id = None
timestamp = None

try:
for packet in packet_iter(signature_raw):
tag = packet["type"]
body = packet["body"]

if tag == _TAG_ONE_PASS_SIG:
version = body[0]
if version == 3 and len(body) >= 13:
key_id = body[4:12].hex().upper()
elif version == 6 and len(body) >= 7:
# v6 OPS: type(1) + hash(1) + pubkey(1) + salt_len(1) + salt(N)
# + fpr_len(1) + fingerprint(N) + nested(1)
salt_len = body[4]
fpr_offset = 5 + salt_len
if fpr_offset < len(body):
fpr_len = body[fpr_offset]
fpr = body[fpr_offset + 1 : fpr_offset + 1 + fpr_len]
key_id = fpr[-8:].hex().upper() if len(fpr) >= 8 else None

elif tag == _TAG_LITERAL_DATA:
fname_len = body[1]
literal_data = body[6 + fname_len :]

elif tag == _TAG_SIGNATURE:
version = body[0]
if version in (4, 5) and len(body) >= 6:
hashed_len = (body[4] << 8) + body[5]
hashed_data = body[6 : 6 + hashed_len]
unhashed_start = 6 + hashed_len
unhashed_len = (body[unhashed_start] << 8) + body[unhashed_start + 1]
unhashed_data = body[unhashed_start + 2 : unhashed_start + 2 + unhashed_len]
sp = {
**_extract_sig_metadata(unhashed_data),
**_extract_sig_metadata(hashed_data),
}
timestamp = timestamp or sp.get("timestamp")
key_id = key_id or sp.get("key_id")
elif version == 6 and len(body) >= 8:
hashed_len = int.from_bytes(body[4:8], "big")
hashed_data = body[8 : 8 + hashed_len]
unhashed_start = 8 + hashed_len
unhashed_len = int.from_bytes(
body[unhashed_start : unhashed_start + 4], "big"
)
unhashed_data = body[unhashed_start + 4 : unhashed_start + 4 + unhashed_len]
sp = {
**_extract_sig_metadata(unhashed_data),
**_extract_sig_metadata(hashed_data),
}
timestamp = timestamp or sp.get("timestamp")
key_id = key_id or sp.get("key_id")
except (ValueError, IndexError, NotImplementedError) as exc:
log.info("Failed to parse OpenPGP packets: %s", exc)
return None

if literal_data is None or key_id is None or timestamp is None:
log.info(
"Incomplete OpenPGP inline-signed message "
"(literal_data=%s, key_id=%s, timestamp=%s)",
literal_data is not None,
key_id,
timestamp,
)
return None

return literal_data, key_id, timestamp


def extract_data_from_signature(signature_raw, man_digest):
"""
Extract data from an "integrated" signature, aka a signed non-encrypted document.

Parses the OpenPGP inline-signed message directly, supporting both v4 (RFC 4880)
and v6 (RFC 9580) packet formats including post-quantum cryptography signatures.

Args:
signature_raw(bytes): A signed doc to get data from
man_digest (str): A manifest digest for which the signature is for
Expand All @@ -98,20 +196,23 @@ def extract_data_from_signature(signature_raw, man_digest):
dict: JSON representation of the document and available data about signature

"""
gpg = gnupg.GPG()
crypt_obj = gpg.decrypt(signature_raw, extra_args=["--skip-verify"])
if not crypt_obj.data:
parsed = _extract_inline_sig_data(signature_raw)
if parsed is None:
log.info(
"It is not possible to read the signed document, GPG error: {}".format(crypt_obj.stderr)
"It is not possible to read the signed document for %s",
man_digest,
)
return

literal_data, key_id, timestamp = parsed

try:
sig_json = json.loads(crypt_obj.data)
sig_json = json.loads(literal_data)
except Exception as exc:
log.info(
"Signed document cannot be parsed to create a signature for {}."
" Error: {}".format(man_digest, str(exc))
"Signed document cannot be parsed to create a signature for %s. Error: %s",
man_digest,
exc,
)
return

Expand All @@ -120,15 +221,11 @@ def extract_data_from_signature(signature_raw, man_digest):
errors.append(f'{".".join(error.path)}: {error.message}')

if errors:
log.info("The signature for {} is not synced due to: {}".format(man_digest, errors))
log.info("The signature for %s is not synced due to: %s", man_digest, errors)
return

# decrypted and unverified signatures do not have prepopulated the key_id and timestamp
# fields; thus, it is necessary to use the debugging utilities of gpg to extract these
# fields since they are not encrypted and still readable without decrypting the signature first
packets = subprocess.check_output(["gpg", "--list-packets"], input=signature_raw).decode()
sig_json["signing_key_id"] = KEY_ID_REGEX_COMPILED.search(packets).group(1)
sig_json["signature_timestamp"] = TIMESTAMP_REGEX_COMPILED.search(packets).group(1)
sig_json["signing_key_id"] = key_id
sig_json["signature_timestamp"] = timestamp

return sig_json

Expand Down
Loading