Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions python/packages/jumpstarter-driver-iscsi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ export:
root_dir: "/var/lib/iscsi"
target_name: "demo"
remove_created_on_close: false # Keep disk images persistent (default)
block_device_allowlist: # Required to use is_block=True LUNs
- /dev/sda
- /dev/disk/by-id/my-disk
# When size_mb is 0 a pre-existing file size is used.
```

Expand All @@ -55,6 +58,7 @@ export:
| `host` | IP address to bind the target to. Empty string will auto-detect | str | no | _auto_ |
| `port` | TCP port the target listens on | int | no | `3260` |
| `remove_created_on_close`| Automatically remove created files/directories when driver closes| bool | no | false |
| `block_device_allowlist`| List of allowed block device paths for `is_block=True` LUNs. Symlinks are resolved before matching. Must be set to use block devices. | list[str] | no | `[]` (empty -- block devices disabled) |

### File Management

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import Any, Dict, List, Optional

from jumpstarter_driver_opendal.driver import Opendal
from pydantic import validate_call
from rtslib_fb import LUN, TPG, BlockStorageObject, FileIOStorageObject, NetworkPortal, RTSRoot, Target

from jumpstarter.driver import Driver, export
Expand Down Expand Up @@ -47,6 +48,7 @@ class ISCSI(Driver):
host: str = field(default="")
port: int = 3260
remove_created_on_close: bool = False # Keep disk images persistent by default
block_device_allowlist: List[str] = field(default_factory=list)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this parameter would need to be documented in the README.md of the driver.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I've added block_device_allowlist to both the config parameters table and the YAML example in the driver README (commit 9232932).


_rtsroot: Optional[RTSRoot] = field(init=False, default=None)
_target: Optional[Target] = field(init=False, default=None)
Expand Down Expand Up @@ -186,6 +188,7 @@ def _cleanup_orphan_storage_objects(self):
self.logger.debug(f"No orphan storage object cleanup performed: {e}")

@export
@validate_call
def clear_all_luns(self):
"""Remove all existing LUNs and their backstores, including any orphans under root_dir"""
if self._tpg is None:
Expand All @@ -197,6 +200,7 @@ def clear_all_luns(self):
self._cleanup_orphan_storage_objects()

@export
@validate_call
def start(self):
"""Start the iSCSI target server

Expand All @@ -212,6 +216,7 @@ def start(self):
raise ISCSIError(f"Failed to start iSCSI target server: {e}") from e

@export
@validate_call
def stop(self):
"""Stop the iSCSI target server

Expand All @@ -232,6 +237,7 @@ def stop(self):
raise ISCSIError(f"Failed to stop iSCSI target: {e}") from e

@export
@validate_call
def get_host(self) -> str:
"""Get the host address the server is bound to

Expand All @@ -241,6 +247,7 @@ def get_host(self) -> str:
return self.host

@export
@validate_call
def get_port(self) -> int:
"""Get the port number the server is listening on

Expand All @@ -250,6 +257,7 @@ def get_port(self) -> int:
return self.port

@export
@validate_call
def get_target_iqn(self) -> str:
"""Get the IQN of the target

Expand All @@ -272,7 +280,17 @@ def _get_full_path(self, file_path: str, is_block: bool) -> str:
if is_block:
if not os.path.isabs(file_path):
raise ISCSIError("For block devices, file_path must be an absolute path")
return file_path
resolved_path = os.path.realpath(file_path)
if not self.block_device_allowlist:
raise ISCSIError(
"block_device_allowlist is empty; configure allowed block device paths "
"to use is_block=True"
)
if resolved_path not in self.block_device_allowlist:
raise ISCSIError(
f"Block device path '{resolved_path}' is not in the configured allowlist"
)
return resolved_path
else:
normalized_path = os.path.normpath(file_path)

Expand Down Expand Up @@ -310,6 +328,7 @@ def _check_no_symlinks_in_path(self, path: str) -> None:
path_to_check = os.path.dirname(path_to_check)

@export
@validate_call
def decompress(self, src_path: str, dst_path: str, algo: str) -> None:
"""Decompress a file under storage root into another path under storage root.

Expand Down Expand Up @@ -383,6 +402,7 @@ def _create_file_storage_object(self, name: str, full_path: str, size_mb: int) -
return FileIOStorageObject(name, full_path, size=size_bytes), size_mb

@export
@validate_call
def add_lun(self, name: str, file_path: str, size_mb: int = 0, is_block: bool = False) -> str:
"""
Add a new LUN to the iSCSI target.
Expand Down Expand Up @@ -427,6 +447,7 @@ def add_lun(self, name: str, file_path: str, size_mb: int = 0, is_block: bool =
raise ISCSIError(f"Failed to add LUN: {e}") from e

@export
@validate_call
def remove_lun(self, name: str):
"""Remove a LUN from the iSCSI target

Expand Down Expand Up @@ -455,6 +476,7 @@ def remove_lun(self, name: str):
raise ISCSIError(f"Failed to remove LUN: {e}") from e

@export
@validate_call
def list_luns(self) -> List[Dict[str, Any]]:
"""List all configured LUNs

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""Tests for iSCSI driver block device allowlist and path confinement."""

import os
import sys

import pytest

if sys.platform != "linux":
pytest.skip("iSCSI driver requires Linux (libudev)", allow_module_level=True)

from jumpstarter_driver_iscsi.driver import ISCSI, ISCSIError


@pytest.fixture
def tmp_root(tmp_path):
return str(tmp_path)


def _make_driver(tmp_root, block_device_allowlist=None):
"""Create an ISCSI driver instance without initializing rtslib."""
driver = object.__new__(ISCSI)
driver.root_dir = tmp_root
driver.block_device_allowlist = block_device_allowlist or []
return driver


class TestGetFullPathBlock:
"""Tests for _get_full_path with is_block=True."""

def test_block_empty_allowlist_rejects(self, tmp_root):
driver = _make_driver(tmp_root, block_device_allowlist=[])
with pytest.raises(ISCSIError, match="block_device_allowlist is empty"):
driver._get_full_path("/dev/sda", is_block=True)

def test_block_not_in_allowlist_rejects(self, tmp_root):
driver = _make_driver(tmp_root, block_device_allowlist=["/dev/sdb"])
with pytest.raises(ISCSIError, match="not in the configured allowlist"):
driver._get_full_path("/dev/sda", is_block=True)

def test_block_in_allowlist_accepted(self, tmp_root):
driver = _make_driver(tmp_root, block_device_allowlist=["/dev/sda"])
result = driver._get_full_path("/dev/sda", is_block=True)
assert result == "/dev/sda"

def test_block_relative_path_rejected(self, tmp_root):
driver = _make_driver(tmp_root, block_device_allowlist=["/dev/sda"])
with pytest.raises(ISCSIError, match="must be an absolute path"):
driver._get_full_path("dev/sda", is_block=True)

def test_block_symlink_resolved(self, tmp_root):
"""Symlinks are resolved before checking the allowlist."""
real_dev = os.path.join(tmp_root, "real_device")
link_path = os.path.join(tmp_root, "link_device")
# Create a real file and symlink
with open(real_dev, "w") as f:
f.write("")
os.symlink(real_dev, link_path)

driver = _make_driver(tmp_root, block_device_allowlist=[real_dev])
result = driver._get_full_path(link_path, is_block=True)
assert result == real_dev

def test_block_symlink_not_in_allowlist(self, tmp_root):
"""Symlink target not in allowlist should be rejected."""
real_dev = os.path.join(tmp_root, "real_device")
link_path = os.path.join(tmp_root, "link_device")
with open(real_dev, "w") as f:
f.write("")
os.symlink(real_dev, link_path)

driver = _make_driver(tmp_root, block_device_allowlist=[link_path])
with pytest.raises(ISCSIError, match="not in the configured allowlist"):
driver._get_full_path(link_path, is_block=True)


class TestGetFullPathFile:
"""Tests for _get_full_path with is_block=False (unchanged behavior)."""

def test_file_relative_path_confined(self, tmp_root):
driver = _make_driver(tmp_root)
result = driver._get_full_path("subdir/test.img", is_block=False)
assert result.startswith(tmp_root)

def test_file_absolute_path_rejected(self, tmp_root):
driver = _make_driver(tmp_root)
with pytest.raises(ISCSIError, match="Invalid file path"):
driver._get_full_path("/etc/passwd", is_block=False)

def test_file_traversal_rejected(self, tmp_root):
driver = _make_driver(tmp_root)
with pytest.raises(ISCSIError, match="Invalid file path"):
driver._get_full_path("../../etc/passwd", is_block=False)
56 changes: 55 additions & 1 deletion python/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading