Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
from anyio import fail_after, run_process, sleep
from anyio.streams.file import FileReadStream, FileWriteStream
from jumpstarter_driver_network.driver import TcpNetwork, UnixNetwork, VsockNetwork
from jumpstarter_driver_opendal.driver import FlasherInterface
from jumpstarter_driver_power.driver import PowerInterface, PowerReading
from jumpstarter_driver_pyserial.driver import PySerial
from pydantic import BaseModel, ByteSize, Field, TypeAdapter, ValidationError, validate_call
from qemu.qmp import QMPClient
from qemu.qmp.protocol import ConnectError, Runstate

from jumpstarter.driver import Driver, export
from jumpstarter.driver import Driver, FlasherInterface, export
from jumpstarter.streams.encoding import AutoDecompressIterator


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import pytest
import requests
from opendal import Operator

from jumpstarter_driver_qemu.driver import Qemu

Expand Down Expand Up @@ -77,8 +76,7 @@ def test_driver_qemu(tmp_path, ovmf):
qemu.flasher.flash(cached_image.resolve())
else:
qemu.flasher.flash(
f"pub/fedora/linux/releases/43/Cloud/{arch}/images/Fedora-Cloud-Base-Generic-43-1.6.{arch}.qcow2",
operator=Operator("http", endpoint="https://download.fedoraproject.org"),
f"https://download.fedoraproject.org/pub/fedora/linux/releases/43/Cloud/{arch}/images/Fedora-Cloud-Base-Generic-43-1.6.{arch}.qcow2",
)

qemu.power.on()
Expand Down
2 changes: 0 additions & 2 deletions python/packages/jumpstarter-driver-qemu/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ dependencies = [
"jumpstarter",
"jumpstarter-driver-composite",
"jumpstarter-driver-network",
"jumpstarter-driver-opendal",
"jumpstarter-driver-power",
"jumpstarter-driver-pyserial",
"pyyaml>=6.0.2",
Expand All @@ -38,7 +37,6 @@ source = "vcs"
raw-options = { 'root' = '../../../' }

[tool.uv.sources]
jumpstarter-driver-opendal = { workspace = true }
jumpstarter-driver-composite = { workspace = true }
jumpstarter-driver-network = { workspace = true }
jumpstarter-driver-pyserial = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion python/packages/jumpstarter/jumpstarter/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .base import DriverClient
from .client import client_from_path
from .flasher import FlasherClient, FlasherClientInterface
from .lease import DirectLease, Lease

__all__ = ["DriverClient", "DirectLease", "client_from_path", "Lease"]
__all__ = ["DriverClient", "DirectLease", "FlasherClient", "FlasherClientInterface", "client_from_path", "Lease"]
278 changes: 278 additions & 0 deletions python/packages/jumpstarter/jumpstarter/client/flasher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
from __future__ import annotations

from abc import ABCMeta, abstractmethod
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from os import PathLike
from pathlib import Path
from typing import Any, Callable, Mapping, cast

import click
from anyio import BrokenResourceError, EndOfStream
from anyio.abc import ObjectStream

from jumpstarter.client import DriverClient
from jumpstarter.client.adapters import blocking
from jumpstarter.client.decorators import driver_click_group
from jumpstarter.common.resources import PresignedRequestResource
from jumpstarter.streams.encoding import Compression
from jumpstarter.streams.progress import ProgressAttribute

PathBuf = str | PathLike


@dataclass(kw_only=True)
class _AsyncIteratorStream(ObjectStream[bytes]):
"""Wraps an async iterator as an ObjectStream for resource_async."""

iterator: Any
total: int | None = None

async def receive(self) -> bytes:
try:
return await self.iterator.__anext__()
except StopAsyncIteration:
raise EndOfStream from None

async def send(self, item: bytes):
raise BrokenResourceError("read-only stream")

async def send_eof(self):
pass

async def aclose(self):
await self.iterator.aclose()

@property
def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
if self.total is not None and self.total > 0:
return {ProgressAttribute.total: lambda: float(self.total)}
return {}


@dataclass(kw_only=True)
class _FileWriteObjectStream(ObjectStream[bytes]):
"""Wraps a file path as a writable ObjectStream for resource_async."""

path: Path
_file: Any = field(default=None, init=False)

async def receive(self) -> bytes:
raise EndOfStream

async def send(self, item: bytes):
if self._file is None:
import anyio

self._file = await anyio.open_file(self.path, "wb")
await self._file.write(item)

async def send_eof(self):
if self._file is not None:
await self._file.aclose()
self._file = None

async def aclose(self):
if self._file is not None:
await self._file.aclose()
self._file = None


def _parse_path(path: PathBuf) -> tuple[Path | None, str | None]:
"""Parse a path into either a local Path or an HTTP URL.

Returns (local_path, None) for local files, or (None, url) for HTTP URLs.
"""
path_str = str(path)
if path_str.startswith(("http://", "https://")):
return None, path_str
return Path(path).resolve(), None


@blocking
@asynccontextmanager
async def _local_file_adapter(
*,
client: DriverClient,
path: Path,
mode: str = "rb",
compression: Compression | None = None,
):
"""Stream a local file via resource_async, without opendal."""
import anyio

if mode == "rb":
# Read mode: stream file content to exporter
file_size = path.stat().st_size

async def file_reader():
async with await anyio.open_file(path, "rb") as f:
while True:
chunk = await f.read(65536)
if not chunk:
break
yield chunk

stream = _AsyncIteratorStream(
iterator=file_reader(),
total=file_size,
)

async with client.resource_async(stream, content_encoding=compression) as res:
yield res
else:
# Write mode: receive content from exporter into file
stream = _FileWriteObjectStream(path=path)
async with client.resource_async(stream, content_encoding=compression) as res:
yield res


@blocking
@asynccontextmanager
async def _http_url_adapter(
*,
client: DriverClient,
url: str,
mode: str = "rb",
):
"""Create a PresignedRequestResource for an HTTP URL.

The exporter already handles HTTP downloads via aiohttp,
so we just pass the URL as a presigned GET request.
"""
if mode == "rb":
yield PresignedRequestResource(
headers={},
url=url,
method="GET",
).model_dump(mode="json")
else:
yield PresignedRequestResource(
headers={},
url=url,
method="PUT",
).model_dump(mode="json")


class FlasherClientInterface(metaclass=ABCMeta):
@abstractmethod
def flash(
self,
path: PathBuf | dict[str, PathBuf],
*,
target: str | None = None,
compression: Compression | None = None,
):
"""Flash image to DUT"""
...

@abstractmethod
def dump(
self,
path: PathBuf,
*,
target: str | None = None,
compression: Compression | None = None,
):
"""Dump image from DUT"""
...

def cli(self):
@driver_click_group(self)
def base():
"""Generic flasher interface"""
pass

@base.command()
@click.argument("file", nargs=-1, required=False)
@click.option(
"--target",
"-t",
"target_specs",
multiple=True,
help="name:file",
)
@click.option("--compression", type=click.Choice(Compression, case_sensitive=False))
def flash(file, target_specs, compression):
if target_specs:
mapping: dict[str, str] = {}
for spec in target_specs:
if ":" not in spec:
raise click.ClickException(f"Invalid target spec '{spec}', expected name:file")
name, img = spec.split(":", 1)
mapping[name] = img
self.flash(cast(dict[str, PathBuf], mapping), compression=compression)
return

if not file:
raise click.ClickException("FILE argument is required unless --target/-t is used")

self.flash(file[0], target=None, compression=compression)

@base.command()
@click.argument("file")
@click.option("--target", type=str)
@click.option("--compression", type=click.Choice(Compression, case_sensitive=False))
def dump(file, target, compression):
"""Dump image from DUT to file"""
self.dump(file, target=target, compression=compression)

return base


class FlasherClient(FlasherClientInterface, DriverClient):
def _flash_single(
self,
image: PathBuf,
*,
target: str | None,
compression: Compression | None,
):
"""Flash image to DUT"""
local_path, url = _parse_path(image)

if url is not None:
# HTTP URL: pass as presigned request for exporter-side download
with _http_url_adapter(client=self, url=url, mode="rb") as handle:
return self.call("flash", handle, target)
else:
# Local file: stream via resource_async
with _local_file_adapter(client=self, path=local_path, mode="rb", compression=compression) as handle:
return self.call("flash", handle, target)

def flash(
self,
path: PathBuf | dict[str, PathBuf],
*,
target: str | None = None,
compression: Compression | None = None,
):
if isinstance(path, dict):
if target is not None:
from jumpstarter.common.exceptions import ArgumentError

raise ArgumentError("'target' parameter is not valid when flashing multiple images")

results: dict[str, object] = {}
for part, img in path.items():
results[part] = self._flash_single(img, target=part, compression=compression)
return results

return self._flash_single(path, target=target, compression=compression)

def dump(
self,
path: PathBuf,
*,
target: str | None = None,
compression: Compression | None = None,
):
"""Dump image from DUT"""
local_path, url = _parse_path(path)

if url is not None:
with _http_url_adapter(client=self, url=url, mode="wb") as handle:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we testing all paths in here: https://github.com/jumpstarter-dev/jumpstarter/blob/main/python/packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/driver_test.py#L79 ?

I see it uses the Operator, can we make sure that we take that and interpret that for compatibility purposes without really relying/using opendal?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. The current test covers two paths:

  1. Local file path (cached_image.resolve()) -- this exercises the _local_file_adapter code path in the new FlasherClient, streaming the file via resource_async.
  2. HTTP URL (the Fedora download URL) -- this was previously using Operator("http", endpoint=...) and is now a direct https:// URL, which exercises the _http_url_adapter code path using PresignedRequestResource.

Both paths in the new FlasherClient are covered by the integration test.

Regarding backward compatibility with opendal.Operator: the opendal package's FlasherClient (in jumpstarter_driver_opendal.client) is unchanged and still accepts the operator= parameter. Drivers that depend on jumpstarter-driver-opendal (esp32, pi-pico, dutlink, sdwire) continue to use that client via their own FlasherInterface.client() classmethod which points to the opendal version.

The QEMU driver now uses the core FlasherInterface which points to the simplified FlasherClient -- this one intentionally does not accept an operator= parameter since the goal is to avoid the opendal dependency entirely. The Operator usage in the old test was only needed because the old flasher client required it for HTTP downloads; the new client handles HTTP URLs natively.

So the API surface for QEMU users changes from:

# Old (required opendal)
qemu.flasher.flash("path/on/server", operator=Operator("http", endpoint="https://example.com"))

# New (no opendal needed)
qemu.flasher.flash("https://example.com/path/on/server")

The new API is simpler and doesn't break any existing QEMU usage since the operator= parameter was only used in this test, not in production QEMU workflows.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but if that test is there, is not executing. Should we make sure to have a test that executes the full url download?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right -- when the cached image exists (which it does in CI), the HTTP URL branch is never exercised.

I'll add a dedicated unit test for the _http_url_adapter and _parse_path helpers in the core jumpstarter package to make sure the URL code path is covered without needing a full Fedora image download. This will verify that:

  1. _parse_path correctly distinguishes local paths from HTTP/HTTPS URLs
  2. _http_url_adapter produces a proper PresignedRequestResource for HTTP URLs
  3. The FlasherClient._flash_single method routes HTTP URLs through the presigned request path

For a full end-to-end HTTP download test, that would involve downloading a large image in CI which is expensive. I think a unit test covering the routing logic is the right trade-off here. Does that work for you?

return self.call("dump", handle, target)
else:
with _local_file_adapter(client=self, path=local_path, mode="wb", compression=compression) as handle:
return self.call("dump", handle, target)
Loading
Loading