Skip to content
This repository was archived by the owner on Jan 23, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from qemu.qmp.protocol import ConnectError, Runstate

from jumpstarter.driver import Driver, export
from jumpstarter.streams.encoding import AutoDecompressIterator


def _vsock_available():
Expand All @@ -42,9 +43,15 @@ class QemuFlasher(FlasherInterface, Driver):

@export
async def flash(self, source, partition: str | None = None):
"""Flash an image to the specified partition.

Supports transparent decompression of gzip, xz, bz2, and zstd compressed images.
Compression format is auto-detected from file signature.
"""
async with await FileWriteStream.from_path(self.parent.validate_partition(partition)) as stream:
async with self.resource(source) as res:
async for chunk in res:
# Wrap with auto-decompression to handle .gz, .xz, .bz2, .zstd files
async for chunk in AutoDecompressIterator(source=res.__aiter__()):
Comment thread
evakhoni marked this conversation as resolved.
Outdated
await stream.send(chunk)

@export
Expand Down
117 changes: 116 additions & 1 deletion packages/jumpstarter/jumpstarter/streams/encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import lzma
import sys
import zlib
from dataclasses import dataclass
from collections.abc import AsyncIterator
from dataclasses import dataclass, field
from enum import StrEnum
from typing import Any, Callable, Mapping

Expand All @@ -22,6 +23,55 @@ class Compression(StrEnum):
ZSTD = "zstd"


@dataclass(frozen=True)
class FileSignature:
"""File signature (magic bytes) for a compression format."""

signature: bytes
compression: Compression


# File signatures for compression format detection
# Reference: https://file-extension.net/seeker/
COMPRESSION_SIGNATURES: tuple[FileSignature, ...] = (
FileSignature(b"\x1f\x8b\x08", Compression.GZIP),
FileSignature(b"\xfd\x37\x7a\x58\x5a\x00", Compression.XZ),
FileSignature(b"\x42\x5a\x68", Compression.BZ2),
FileSignature(b"\x28\xb5\x2f\xfd", Compression.ZSTD),
)

# Standard buffer size for file signature detection (covers most formats)
SIGNATURE_BUFFER_SIZE = 8


def detect_compression_from_signature(data: bytes) -> Compression | None:
"""Detect compression format from file signature bytes at the start of data.

Args:
data: The first few bytes of the file/stream (at least SIGNATURE_BUFFER_SIZE bytes recommended)

Returns:
The detected Compression type, or None if uncompressed/unknown
"""
for sig in COMPRESSION_SIGNATURES:
if data.startswith(sig.signature):
return sig.compression
return None


def create_decompressor(compression: Compression) -> Any:
"""Create a decompressor object for the given compression type."""
match compression:
case Compression.GZIP:
return zlib.decompressobj(wbits=47) # Auto-detect gzip/zlib
case Compression.XZ:
return lzma.LZMADecompressor()
case Compression.BZ2:
return bz2.BZ2Decompressor()
case Compression.ZSTD:
return zstd.ZstdDecompressor()


@dataclass(kw_only=True)
class CompressedStream(ObjectStream[bytes]):
stream: AnyByteStream
Expand Down Expand Up @@ -99,3 +149,68 @@ def compress_stream(stream: AnyByteStream, compression: Compression | None) -> A
compressor=zstd.ZstdCompressor(),
decompressor=zstd.ZstdDecompressor(),
)


@dataclass(kw_only=True)
class AutoDecompressIterator(AsyncIterator[bytes]):
"""An async iterator that auto-detects and decompresses compressed data.

This wraps an async iterator of bytes and transparently decompresses
gzip, xz, bz2, or zstd compressed data based on file signature detection.
Uncompressed data passes through unchanged.
"""

source: AsyncIterator[bytes]
_decompressor: Any = field(init=False, default=None)
_detected: bool = field(init=False, default=False)
_buffer: bytes = field(init=False, default=b"")
_exhausted: bool = field(init=False, default=False)

async def _detect_compression(self) -> None:
"""Read enough bytes to detect compression format."""
# Buffer data until we have enough for detection
while len(self._buffer) < SIGNATURE_BUFFER_SIZE and not self._exhausted:
try:
chunk = await self.source.__anext__()
self._buffer += chunk
except StopAsyncIteration:
self._exhausted = True
break

# Detect compression from buffered data
compression = detect_compression_from_signature(self._buffer)
if compression is not None:
self._decompressor = create_decompressor(compression)

self._detected = True

async def __anext__(self) -> bytes:
# First call: detect compression format
if not self._detected:
await self._detect_compression()

# Process buffered data first
if self._buffer:
data = self._buffer
self._buffer = b""
if self._decompressor is not None:
return self._decompressor.decompress(data)
Comment thread
evakhoni marked this conversation as resolved.
Outdated
return data

# Stream exhausted
if self._exhausted:
raise StopAsyncIteration

# Read and process next chunk
try:
chunk = await self.source.__anext__()
except StopAsyncIteration:
self._exhausted = True
raise

if self._decompressor is not None:
return self._decompressor.decompress(chunk)
return chunk
Comment thread
coderabbitai[bot] marked this conversation as resolved.

def __aiter__(self) -> AsyncIterator[bytes]:
return self