diff --git a/music_assistant/controllers/config.py b/music_assistant/controllers/config.py index 239970e47e..c540691157 100644 --- a/music_assistant/controllers/config.py +++ b/music_assistant/controllers/config.py @@ -108,6 +108,7 @@ ) from music_assistant.helpers.api import api_command from music_assistant.helpers.json import JSON_DECODE_EXCEPTIONS, async_json_dumps, async_json_loads +from music_assistant.helpers.permissions import Permission from music_assistant.helpers.util import load_provider_module, validate_announcement_chime_url from music_assistant.models import ProviderModuleType from music_assistant.models.music_provider import MusicProvider @@ -475,7 +476,7 @@ async def get_provider_config_entries( # noqa: PLR0915 entry.value = values.get(entry.key, entry.default_value) return all_entries - @api_command("config/providers/save", required_role="admin") + @api_command("config/providers/save", required_permissions=[Permission.PROVIDERS_MANAGE]) async def save_provider_config( self, provider_domain: str, @@ -496,7 +497,7 @@ async def save_provider_config( # return full config, just in case return await self.get_provider_config(config.instance_id) - @api_command("config/providers/remove", required_role="admin") + @api_command("config/providers/remove", required_permissions=[Permission.PROVIDERS_MANAGE]) async def remove_provider_config(self, instance_id: str) -> None: """Remove ProviderConfig.""" conf_key = f"{CONF_PROVIDERS}/{instance_id}" @@ -796,7 +797,7 @@ def get_base_player_config(self, player_id: str, provider: str) -> PlayerConfig: } return cast("PlayerConfig", PlayerConfig.parse([], raw_conf)) - @api_command("config/players/save", required_role="admin") + @api_command("config/players/save", required_permissions=[Permission.PLAYERS_CONFIGURE]) async def save_player_config( self, player_id: str, values: dict[str, ConfigValueType] ) -> PlayerConfig: @@ -838,7 +839,7 @@ async def save_player_config( # return full player config (just in case) return await self.get_player_config(player_id) - @api_command("config/players/remove", required_role="admin") + @api_command("config/players/remove", required_permissions=[Permission.PLAYERS_CONFIGURE]) async def remove_player_config(self, player_id: str) -> None: """Remove PlayerConfig.""" conf_key = f"{CONF_PLAYERS}/{player_id}" @@ -930,7 +931,7 @@ def get_player_dsp_config(self, player_id: str) -> DSPConfig: dsp_config.enabled = False return dsp_config - @api_command("config/players/dsp/save", required_role="admin") + @api_command("config/players/dsp/save", required_permissions=[Permission.PLAYERS_CONFIGURE]) async def save_dsp_config(self, player_id: str, config: DSPConfig) -> DSPConfig: """ Save/update DSPConfig for a player. @@ -957,7 +958,7 @@ async def get_dsp_presets(self) -> list[DSPConfigPreset]: raw_presets = self.get(CONF_PLAYER_DSP_PRESETS, {}) return [DSPConfigPreset.from_dict(preset) for preset in raw_presets.values()] - @api_command("config/dsp_presets/save", required_role="admin") + @api_command("config/dsp_presets/save", required_permissions=[Permission.CONFIG_WRITE]) async def save_dsp_presets(self, preset: DSPConfigPreset) -> DSPConfigPreset: """ Save/update a user-defined DSP presets. @@ -982,7 +983,7 @@ async def save_dsp_presets(self, preset: DSPConfigPreset) -> DSPConfigPreset: return preset - @api_command("config/dsp_presets/remove", required_role="admin") + @api_command("config/dsp_presets/remove", required_permissions=[Permission.CONFIG_WRITE]) async def remove_dsp_preset(self, preset_id: str) -> None: """Remove a user-defined DSP preset.""" self.mass.config.remove(f"{CONF_PLAYER_DSP_PRESETS}/preset_{preset_id}") @@ -1152,7 +1153,7 @@ async def get_core_config_entries( entry.value = values.get(entry.key, entry.default_value) return all_entries - @api_command("config/core/save", required_role="admin") + @api_command("config/core/save", required_permissions=[Permission.CONFIG_WRITE]) async def save_core_config( self, domain: str, @@ -1469,7 +1470,7 @@ async def _async_save(self) -> None: await _file.write(await async_json_dumps(self._data, indent=True)) LOGGER.debug("Saved data to persistent storage") - @api_command("config/providers/reload", required_role="admin") + @api_command("config/providers/reload", required_permissions=[Permission.PROVIDERS_MANAGE]) async def _reload_provider(self, instance_id: str) -> None: """Reload provider.""" try: diff --git a/music_assistant/controllers/media/base.py b/music_assistant/controllers/media/base.py index 2954a45afb..7a1b4da710 100644 --- a/music_assistant/controllers/media/base.py +++ b/music_assistant/controllers/media/base.py @@ -34,6 +34,7 @@ from music_assistant.helpers.compare import compare_media_item, create_safe_string from music_assistant.helpers.database import UNSET from music_assistant.helpers.json import json_loads, serialize_to_json +from music_assistant.helpers.permissions import Permission from music_assistant.helpers.util import guard_single_request, parse_optional_bool if TYPE_CHECKING: @@ -124,10 +125,14 @@ def __init__(self, mass: MusicAssistant) -> None: f"music/{api_base}/get_{self.media_type}", self.get, alias=True ) self.mass.register_api_command( - f"music/{api_base}/update", self.update_item_in_library, required_role="admin" + f"music/{api_base}/update", + self.update_item_in_library, + required_permissions=[Permission.LIBRARY_WRITE], ) self.mass.register_api_command( - f"music/{api_base}/remove", self.remove_item_from_library, required_role="admin" + f"music/{api_base}/remove", + self.remove_item_from_library, + required_permissions=[Permission.LIBRARY_DELETE], ) self._db_add_lock = asyncio.Lock() diff --git a/music_assistant/controllers/media/genres.py b/music_assistant/controllers/media/genres.py index 14f5cd9939..b5125253b6 100644 --- a/music_assistant/controllers/media/genres.py +++ b/music_assistant/controllers/media/genres.py @@ -37,6 +37,7 @@ from music_assistant.helpers.compare import create_safe_string from music_assistant.helpers.database import UNSET from music_assistant.helpers.json import serialize_to_json +from music_assistant.helpers.permissions import Permission from .base import MediaControllerBase @@ -82,33 +83,39 @@ def __init__(self, mass: MusicAssistant) -> None: # register extra api handlers self.mass.register_api_command( - "music/genres/add_alias", self.add_alias, required_role="admin" + "music/genres/add_alias", + self.add_alias, + required_permissions=[Permission.LIBRARY_WRITE], ) self.mass.register_api_command( - "music/genres/remove_alias", self.remove_alias, required_role="admin" + "music/genres/remove_alias", + self.remove_alias, + required_permissions=[Permission.LIBRARY_WRITE], ) self.mass.register_api_command( - "music/genres/add_media_mapping", self.add_media_mapping, required_role="admin" + "music/genres/add_media_mapping", + self.add_media_mapping, + required_permissions=[Permission.LIBRARY_WRITE], ) self.mass.register_api_command( "music/genres/remove_media_mapping", self.remove_media_mapping, - required_role="admin", + required_permissions=[Permission.LIBRARY_WRITE], ) self.mass.register_api_command( "music/genres/promote_alias", self.promote_alias_to_genre, - required_role="admin", + required_permissions=[Permission.LIBRARY_WRITE], ) self.mass.register_api_command( "music/genres/restore_defaults", self.restore_default_genres, - required_role="admin", + required_permissions=[Permission.LIBRARY_WRITE], ) self.mass.register_api_command( "music/genres/add", self.add_item_to_library, - required_role="admin", + required_permissions=[Permission.LIBRARY_WRITE], ) self.mass.register_api_command( "music/genres/overview", @@ -121,7 +128,7 @@ def __init__(self, mass: MusicAssistant) -> None: self.mass.register_api_command( "music/genres/scan_mappings", self.scan_mappings, - required_role="admin", + required_permissions=[Permission.LIBRARY_WRITE], ) self.mass.register_api_command( "music/genres/scanner_status", @@ -134,7 +141,7 @@ def __init__(self, mass: MusicAssistant) -> None: self.mass.register_api_command( "music/genres/merge", self.merge_genres, - required_role="admin", + required_permissions=[Permission.LIBRARY_WRITE], ) # Run genre mapping scanner after library sync completes diff --git a/music_assistant/controllers/players/controller.py b/music_assistant/controllers/players/controller.py index e4621f60a4..8cf7ece22e 100644 --- a/music_assistant/controllers/players/controller.py +++ b/music_assistant/controllers/players/controller.py @@ -83,6 +83,7 @@ get_sendspin_player_id, ) from music_assistant.helpers.api import api_command +from music_assistant.helpers.permissions import Permission from music_assistant.helpers.tags import async_parse_tags from music_assistant.helpers.throttle_retry import Throttler from music_assistant.helpers.util import TaskManager, validate_announcement_chime_url @@ -1114,7 +1115,7 @@ async def cmd_ungroup_many(self, player_ids: list[str]) -> None: for player_id in list(player_ids): await self.cmd_ungroup(player_id) - @api_command("players/create_group_player", required_role="admin") + @api_command("players/create_group_player", required_permissions=[Permission.PLAYERS_CONFIGURE]) async def create_group_player( self, provider: str, name: str, members: list[str], dynamic: bool = True ) -> Player: @@ -1135,7 +1136,7 @@ async def create_group_player( ) return await provider_instance.create_group_player(name, members, dynamic) - @api_command("players/remove_group_player", required_role="admin") + @api_command("players/remove_group_player", required_permissions=[Permission.PLAYERS_CONFIGURE]) async def remove_group_player(self, player_id: str) -> None: """Remove a group player.""" if not (player := self.get_player(player_id)): @@ -1360,7 +1361,7 @@ async def unregister(self, player_id: str, permanent: bool = False) -> None: # Schedule debounced update of all players since can_group_with values may change self._schedule_update_all_players() - @api_command("players/remove", required_role="admin") + @api_command("players/remove", required_permissions=[Permission.PLAYERS_CONFIGURE]) async def remove(self, player_id: str) -> None: """ Remove a player from a provider. diff --git a/music_assistant/controllers/webserver/api_docs.py b/music_assistant/controllers/webserver/api_docs.py index 104cf1b3da..3b548df361 100644 --- a/music_assistant/controllers/webserver/api_docs.py +++ b/music_assistant/controllers/webserver/api_docs.py @@ -1117,6 +1117,7 @@ def generate_commands_json(command_handlers: dict[str, APICommandHandler]) -> li "return_type": str, # Return type "authenticated": bool, # Whether authentication is required "required_role": str | None, # Required user role (if any) + "required_permissions": list[str] | None, # Required permission scopes (if any) } """ commands_data = [] @@ -1171,6 +1172,11 @@ def generate_commands_json(command_handlers: dict[str, APICommandHandler]) -> li "return_type": return_type_str, "authenticated": handler.authenticated, "required_role": handler.required_role, + "required_permissions": ( + [p.value for p in handler.required_permissions] + if handler.required_permissions + else None + ), } ) diff --git a/music_assistant/controllers/webserver/auth.py b/music_assistant/controllers/webserver/auth.py index c644511ebc..e81653cecc 100644 --- a/music_assistant/controllers/webserver/auth.py +++ b/music_assistant/controllers/webserver/auth.py @@ -42,6 +42,7 @@ from music_assistant.helpers.datetime import utc from music_assistant.helpers.json import json_dumps, json_loads from music_assistant.helpers.jwt_auth import JWTHelper +from music_assistant.helpers.permissions import Permission if TYPE_CHECKING: from music_assistant.controllers.webserver import WebserverController @@ -443,7 +444,10 @@ async def authenticate_with_token(self, token: str) -> User | None: updates, ) - return await self.get_user(user_id) + user = await self.get_user(user_id) + if user: + self._attach_permissions(user, payload) + return user except pyjwt.ExpiredSignatureError: if token_id := self.jwt_helper.get_token_id(token): @@ -485,8 +489,29 @@ async def authenticate_with_token(self, token: str) -> User | None: legacy_updates, ) - # Get user - return await self.get_user(token_row["user_id"]) + # Get user (legacy token — generate permissions from role) + user = await self.get_user(token_row["user_id"]) + if user: + self._attach_permissions(user) + return user + + def _attach_permissions(self, user: User, jwt_payload: dict[str, Any] | None = None) -> None: + """Attach permission and claims data to a User object. + + When a JWT payload is available, permissions and claims are extracted + from it. Otherwise, permissions are generated from the user's role. + + :param user: User object to attach permissions to. + :param jwt_payload: Decoded JWT payload (if available). + """ + from music_assistant.helpers.permissions import get_permissions_for_role # noqa: PLC0415 + + if jwt_payload and "permissions" in jwt_payload: + user.permissions = jwt_payload["permissions"] # type: ignore[attr-defined] + user.claims = jwt_payload # type: ignore[attr-defined] + else: + user.permissions = get_permissions_for_role(user.role) # type: ignore[attr-defined] + user.claims = {} # type: ignore[attr-defined] async def get_token_id_from_token(self, token: str) -> str | None: """ @@ -506,7 +531,7 @@ async def get_token_id_from_token(self, token: str) -> str | None: return None return str(token_row["token_id"]) - @api_command("auth/user", required_role="admin") + @api_command("auth/user", required_permissions=[Permission.USERS_MANAGE]) async def get_user(self, user_id: str) -> User | None: """ Get user by ID (admin only). @@ -976,7 +1001,7 @@ async def get_user_tokens(self, user_id: str | None = None) -> list[AuthToken]: ) return [AuthToken.from_dict(dict(row)) for row in token_rows] - @api_command("auth/users", required_role="admin") + @api_command("auth/users", required_permissions=[Permission.USERS_MANAGE]) async def list_users(self) -> list[User]: """ Get all users (admin only). @@ -1029,7 +1054,7 @@ async def update_user_role(self, user_id: str, new_role: UserRole, admin_user: U ) return True - @api_command("auth/user/enable", required_role="admin") + @api_command("auth/user/enable", required_permissions=[Permission.USERS_MANAGE]) async def enable_user(self, user_id: str) -> None: """ Enable user account (admin only). @@ -1042,7 +1067,7 @@ async def enable_user(self, user_id: str) -> None: {"enabled": 1}, ) - @api_command("auth/user/disable", required_role="admin") + @api_command("auth/user/disable", required_permissions=[Permission.USERS_MANAGE]) async def disable_user(self, user_id: str) -> None: """ Disable user account (admin only). @@ -1249,7 +1274,7 @@ async def create_long_lived_token(self, name: str, user_id: str | None = None) - self.logger.info("Created long-lived token '%s' for user '%s'", name, target_user.username) return token - @api_command("auth/user/create", required_role="admin") + @api_command("auth/user/create", required_permissions=[Permission.USERS_MANAGE]) async def create_user_with_api( self, username: str, @@ -1310,7 +1335,7 @@ async def create_user_with_api( self.logger.info("User created by admin: %s (role: %s)", username, role) return user - @api_command("auth/user/delete", required_role="admin") + @api_command("auth/user/delete", required_permissions=[Permission.USERS_MANAGE]) async def delete_user(self, user_id: str) -> None: """ Delete user account (admin only). @@ -1522,7 +1547,7 @@ async def get_my_providers(self) -> list[dict[str, Any]]: providers = [UserAuthProvider.from_dict(dict(row)) for row in rows] return [p.to_dict() for p in providers] - @api_command("auth/user/unlink_provider", required_role="admin") + @api_command("auth/user/unlink_provider", required_permissions=[Permission.USERS_MANAGE]) async def unlink_provider(self, user_id: str, provider_type: str) -> bool: """ Unlink authentication provider from user (admin only). @@ -1748,7 +1773,7 @@ async def exchange_join_code(self, code: str) -> dict[str, Any]: "error": "Failed to create access token", } - @api_command("auth/join_codes", required_role="admin") + @api_command("auth/join_codes", required_permissions=[Permission.USERS_MANAGE]) async def list_join_codes(self, user_id: str | None = None) -> list[dict[str, Any]]: """List join codes, optionally filtered by user (admin only). @@ -1759,7 +1784,7 @@ async def list_join_codes(self, user_id: str | None = None) -> list[dict[str, An rows = await self.database.get_rows("join_codes", filter_args, limit=100) return [dict(row) for row in rows] - @api_command("auth/join_code/revoke", required_role="admin") + @api_command("auth/join_code/revoke", required_permissions=[Permission.USERS_MANAGE]) async def revoke_join_code(self, code_id: str) -> None: """Revoke a specific join code (admin only). diff --git a/music_assistant/controllers/webserver/controller.py b/music_assistant/controllers/webserver/controller.py index 2c04994e52..85c28e184d 100644 --- a/music_assistant/controllers/webserver/controller.py +++ b/music_assistant/controllers/webserver/controller.py @@ -565,9 +565,18 @@ async def _handle_jsonrpc_api_command(self, request: web.Request) -> web.Respons headers={"WWW-Authenticate": 'Bearer realm="Music Assistant"'}, ) - # Set user in context and check role + # Set user in context and check permissions/role set_current_user(user) - if handler.required_role == "admin" and user.role != UserRole.ADMIN: + if handler.required_permissions: + from music_assistant.helpers.permissions import has_permission # noqa: PLC0415 + + if not has_permission(user, *handler.required_permissions): + perm_names = [p.value for p in handler.required_permissions] + return web.Response( + status=403, + text=f"Missing required permissions: {perm_names}", + ) + elif handler.required_role == "admin" and user.role != UserRole.ADMIN: return web.Response( status=403, text="Admin access required", diff --git a/music_assistant/controllers/webserver/remote_access/__init__.py b/music_assistant/controllers/webserver/remote_access/__init__.py index bae3999206..81307ca8d2 100644 --- a/music_assistant/controllers/webserver/remote_access/__init__.py +++ b/music_assistant/controllers/webserver/remote_access/__init__.py @@ -17,6 +17,7 @@ from music_assistant.constants import CONF_CORE from music_assistant.controllers.webserver.remote_access.gateway import WebRTCGateway +from music_assistant.helpers.permissions import Permission from music_assistant.helpers.webrtc_certificate import ( get_or_create_webrtc_certificate, get_remote_id_from_certificate, @@ -269,11 +270,15 @@ async def configure_remote_access(enabled: bool) -> RemoteAccessInfo: self._on_unload_callbacks.append( self.mass.register_api_command( - "remote_access/info", get_remote_access_info, required_role="admin" + "remote_access/info", + get_remote_access_info, + required_permissions=[Permission.CONFIG_READ], ) ) self._on_unload_callbacks.append( self.mass.register_api_command( - "remote_access/configure", configure_remote_access, required_role="admin" + "remote_access/configure", + configure_remote_access, + required_permissions=[Permission.CONFIG_WRITE], ) ) diff --git a/music_assistant/controllers/webserver/websocket_client.py b/music_assistant/controllers/webserver/websocket_client.py index 12700149a8..c49c9b5fd8 100644 --- a/music_assistant/controllers/webserver/websocket_client.py +++ b/music_assistant/controllers/webserver/websocket_client.py @@ -208,8 +208,22 @@ async def _handle_command(self, msg: CommandMessage) -> None: set_current_token(self._current_token) set_sendspin_player_id(self._sendspin_player_id) - # Check role if required - if handler.required_role == "admin": + # Check permissions if required (takes precedence over role check) + if handler.required_permissions: + from music_assistant.helpers.permissions import has_permission # noqa: PLC0415 + + if not has_permission(self._authenticated_user, *handler.required_permissions): + perm_names = [p.value for p in handler.required_permissions] + await self._send_message( + ErrorResultMessage( + msg.message_id, + InsufficientPermissions.error_code, + f"Missing required permissions: {perm_names}", + ) + ) + return + # Fall back to role check for commands without permissions + elif handler.required_role == "admin": if self._authenticated_user.role != UserRole.ADMIN: await self._send_message( ErrorResultMessage( diff --git a/music_assistant/helpers/api.py b/music_assistant/helpers/api.py index 83e30b5f11..97c3805197 100644 --- a/music_assistant/helpers/api.py +++ b/music_assistant/helpers/api.py @@ -187,6 +187,7 @@ class APICommandHandler: target: Callable[..., Coroutine[Any, Any, Any] | AsyncGenerator[Any, Any]] authenticated: bool = True required_role: str | None = None # "admin" or "user" or None + required_permissions: list[Any] | None = None # list of Permission enum values alias: bool = False # If True, this is an alias for backward compatibility @classmethod @@ -196,6 +197,7 @@ def parse( func: Callable[..., Coroutine[Any, Any, Any] | AsyncGenerator[Any, Any]], authenticated: bool = True, required_role: str | None = None, + required_permissions: list[Any] | None = None, alias: bool = False, ) -> APICommandHandler: """Parse APICommandHandler by providing a function. @@ -205,6 +207,7 @@ def parse( :param authenticated: Whether authentication is required (default: True). :param required_role: Required user role ("admin" or "user") None for any authenticated user. + :param required_permissions: List of Permission scopes required to call this command. :param alias: Whether this is an alias for backward compatibility (default: False). """ type_hints = get_type_hints(func) @@ -267,24 +270,30 @@ def parse( target=func, authenticated=authenticated, required_role=required_role, + required_permissions=required_permissions, alias=alias, ) def api_command( - command: str, authenticated: bool = True, required_role: str | None = None + command: str, + authenticated: bool = True, + required_role: str | None = None, + required_permissions: list[Any] | None = None, ) -> Callable[[_F], _F]: """Decorate a function as API route/command. :param command: The command name/path. :param authenticated: Whether authentication is required (default: True). :param required_role: Required user role ("admin" or "user"), None means any authenticated user. + :param required_permissions: List of Permission scopes required to call this command. """ def decorate(func: _F) -> _F: func.api_cmd = command # type: ignore[attr-defined] func.api_authenticated = authenticated # type: ignore[attr-defined] func.api_required_role = required_role # type: ignore[attr-defined] + func.api_required_permissions = required_permissions # type: ignore[attr-defined] return func return decorate diff --git a/music_assistant/helpers/jwt_auth.py b/music_assistant/helpers/jwt_auth.py index f7054f5a44..177bf5a30f 100644 --- a/music_assistant/helpers/jwt_auth.py +++ b/music_assistant/helpers/jwt_auth.py @@ -20,6 +20,7 @@ import jwt from music_assistant.helpers.datetime import utc +from music_assistant.helpers.permissions import get_permissions_for_role if TYPE_CHECKING: from music_assistant_models.auth import User @@ -63,6 +64,7 @@ def encode_token( "exp": int(expires_at.timestamp()), "username": user.username, "role": user.role.value, + "permissions": get_permissions_for_role(user.role), "token_name": token_name, "is_long_lived": is_long_lived, } diff --git a/music_assistant/helpers/permissions.py b/music_assistant/helpers/permissions.py new file mode 100644 index 0000000000..1fd5f6b8cb --- /dev/null +++ b/music_assistant/helpers/permissions.py @@ -0,0 +1,120 @@ +"""Permission and authorization helpers for Music Assistant. + +This module provides a claims-based permission system that works with both: +- Internal authentication (permissions generated from user roles) +- External OIDC providers (permissions provided in JWT claims) + +Future providers can also contribute custom claims to user tokens. +""" + +from __future__ import annotations + +from enum import Enum +from typing import TYPE_CHECKING, Any + +from music_assistant_models.auth import UserRole + +if TYPE_CHECKING: + from music_assistant_models.auth import User + + +class Permission(str, Enum): + """Permission scopes for Music Assistant operations. + + Permissions follow the format: resource:action + Example: library:read, players:control + """ + + # Wildcard (admin) + ALL = "*" + + # Music library permissions + LIBRARY_READ = "library:read" + LIBRARY_WRITE = "library:write" + LIBRARY_DELETE = "library:delete" + + # Player permissions + PLAYERS_READ = "players:read" + PLAYERS_CONTROL = "players:control" + PLAYERS_CONFIGURE = "players:configure" + + # User management permissions + USERS_READ = "users:read" + USERS_MANAGE = "users:manage" + + # Configuration permissions + CONFIG_READ = "config:read" + CONFIG_WRITE = "config:write" + + # Provider management permissions + PROVIDERS_READ = "providers:read" + PROVIDERS_MANAGE = "providers:manage" + + # Metadata permissions + METADATA_READ = "metadata:read" + METADATA_REFRESH = "metadata:refresh" + + # Stream permissions + STREAMS_READ = "streams:read" + STREAMS_CONTROL = "streams:control" + + +def get_permissions_for_role(role: UserRole) -> list[str]: + """Get permission claims for a user role. + + :param role: User role to get permissions for. + :return: List of permission scope strings. + """ + if role == UserRole.ADMIN: + return [Permission.ALL] + + return [ + Permission.LIBRARY_READ, + Permission.PLAYERS_READ, + Permission.PLAYERS_CONTROL, + Permission.CONFIG_READ, + Permission.PROVIDERS_READ, + Permission.METADATA_READ, + Permission.STREAMS_READ, + Permission.STREAMS_CONTROL, + ] + + +def has_permission(user: User, *required: Permission) -> bool: + """Check if user has all required permissions. + + Checks the user's permissions list first. If empty, falls back to + generating permissions from the user's role. This allows external + OIDC tokens to supply their own permission claims while maintaining + backward compatibility with role-based auth. + + :param user: User to check permissions for. + :param required: One or more required permissions. + :return: True if user has all required permissions. + """ + user_permissions: list[str] = getattr(user, "permissions", None) or [] + + if not user_permissions: + user_permissions = get_permissions_for_role(user.role) + + if Permission.ALL.value in user_permissions: + return True + + return all(perm.value in user_permissions for perm in required) + + +def get_user_claim(user: User, claim: str, default: Any = None) -> Any: + """Get a custom claim from user's JWT token. + + Useful for provider-contributed claims like: + - spotify:premium + - sonos:features + - tidal:subscription_tier + + :param user: User to get claim from. + :param claim: Claim name (e.g., "spotify:premium"). + :param default: Default value if claim not found. + :return: Claim value or default. + """ + claims: dict[str, Any] = getattr(user, "claims", {}) or {} + return claims.get(claim, default) diff --git a/music_assistant/mass.py b/music_assistant/mass.py index e2ec01a175..35264f6224 100644 --- a/music_assistant/mass.py +++ b/music_assistant/mass.py @@ -54,6 +54,7 @@ from music_assistant.helpers.aiohttp_client import create_clientsession from music_assistant.helpers.api import APICommandHandler, api_command from music_assistant.helpers.images import get_icon_string +from music_assistant.helpers.permissions import Permission from music_assistant.helpers.util import ( TaskManager, get_ip_pton, @@ -342,7 +343,7 @@ def get_providers( ) ] - @api_command("logging/get", required_role=UserRole.ADMIN) + @api_command("logging/get", required_permissions=[Permission.CONFIG_READ]) async def get_application_log(self) -> str: """Return the application log from file.""" logfile = os.path.join(self.storage_path, "musicassistant.log") @@ -611,6 +612,7 @@ def register_api_command( handler: Callable[..., Coroutine[Any, Any, Any] | AsyncGenerator[Any, Any]], authenticated: bool = True, required_role: str | None = None, + required_permissions: list[Any] | None = None, alias: bool = False, ) -> Callable[[], None]: """Dynamically register a command on the API. @@ -620,6 +622,7 @@ def register_api_command( :param authenticated: Whether authentication is required (default: True). :param required_role: Required user role ("admin" or "user") None means any authenticated user. + :param required_permissions: List of Permission scopes required to call this command. :param alias: Whether this is an alias for backward compatibility (default: False). Aliases are not shown in API documentation but remain functional. @@ -629,7 +632,7 @@ def register_api_command( msg = f"Command {command} is already registered" raise RuntimeError(msg) self.command_handlers[command] = APICommandHandler.parse( - command, handler, authenticated, required_role, alias + command, handler, authenticated, required_role, required_permissions, alias ) def unregister() -> None: @@ -809,7 +812,14 @@ def _register_api_commands(self) -> None: # method is decorated with our api decorator authenticated = getattr(obj, "api_authenticated", True) required_role = getattr(obj, "api_required_role", None) - self.register_api_command(obj.api_cmd, obj, authenticated, required_role) + required_permissions = getattr(obj, "api_required_permissions", None) + self.register_api_command( + obj.api_cmd, + obj, + authenticated, + required_role, + required_permissions, + ) async def _load_builtin_providers(self) -> None: """ diff --git a/tests/core/test_permissions.py b/tests/core/test_permissions.py new file mode 100644 index 0000000000..dd41e93306 --- /dev/null +++ b/tests/core/test_permissions.py @@ -0,0 +1,153 @@ +"""Tests for the claims-based permission system.""" + +from __future__ import annotations + +from typing import Any + +from music_assistant_models.auth import User, UserRole + +from music_assistant.helpers.permissions import ( + Permission, + get_permissions_for_role, + get_user_claim, + has_permission, +) + + +def _make_user( + role: UserRole = UserRole.USER, + permissions: list[str] | None = None, + claims: dict[str, Any] | None = None, +) -> User: + """Create a mock User for testing. + + :param role: User role. + :param permissions: Optional explicit permissions list. + :param claims: Optional JWT claims dict. + """ + user = User(user_id="test-user", username="testuser", role=role) + if permissions is not None: + user.permissions = permissions # type: ignore[attr-defined] + if claims is not None: + user.claims = claims # type: ignore[attr-defined] + return user + + +class TestPermissionEnum: + """Tests for Permission enum values.""" + + def test_wildcard_value(self) -> None: + """Wildcard permission should be '*'.""" + assert Permission.ALL.value == "*" + + def test_resource_action_format(self) -> None: + """All non-wildcard permissions should follow resource:action format.""" + for perm in Permission: + if perm == Permission.ALL: + continue + assert ":" in perm.value, f"{perm.name} does not follow resource:action format" + + def test_string_enum(self) -> None: + """Permission should be a string enum for JSON serialization.""" + assert isinstance(Permission.LIBRARY_READ, str) + assert Permission.LIBRARY_READ.value == "library:read" + + +class TestGetPermissionsForRole: + """Tests for role-to-permissions mapping.""" + + def test_admin_gets_wildcard(self) -> None: + """Admin role should receive wildcard permission only.""" + perms = get_permissions_for_role(UserRole.ADMIN) + assert perms == [Permission.ALL] + + def test_user_gets_read_and_control(self) -> None: + """Regular user should get read and control permissions.""" + perms = get_permissions_for_role(UserRole.USER) + assert Permission.LIBRARY_READ in perms + assert Permission.PLAYERS_READ in perms + assert Permission.PLAYERS_CONTROL in perms + assert Permission.STREAMS_CONTROL in perms + + def test_user_lacks_admin_permissions(self) -> None: + """Regular user should not have configuration or management permissions.""" + perms = get_permissions_for_role(UserRole.USER) + assert Permission.LIBRARY_WRITE not in perms + assert Permission.LIBRARY_DELETE not in perms + assert Permission.PLAYERS_CONFIGURE not in perms + assert Permission.USERS_MANAGE not in perms + assert Permission.CONFIG_WRITE not in perms + assert Permission.PROVIDERS_MANAGE not in perms + + def test_guest_same_as_user(self) -> None: + """Guest role should fall through to the regular user permissions.""" + perms = get_permissions_for_role(UserRole.GUEST) + user_perms = get_permissions_for_role(UserRole.USER) + assert perms == user_perms + + +class TestHasPermission: + """Tests for permission checking.""" + + def test_admin_has_all_permissions(self) -> None: + """Admin wildcard should grant any permission.""" + user = _make_user(UserRole.ADMIN, permissions=["*"]) + assert has_permission(user, Permission.LIBRARY_DELETE) + assert has_permission(user, Permission.USERS_MANAGE) + assert has_permission(user, Permission.CONFIG_WRITE) + + def test_user_has_granted_permission(self) -> None: + """User should pass check for explicitly granted permissions.""" + user = _make_user(permissions=["library:read", "players:control"]) + assert has_permission(user, Permission.LIBRARY_READ) + assert has_permission(user, Permission.PLAYERS_CONTROL) + + def test_user_lacks_missing_permission(self) -> None: + """User should fail check for permissions not in their list.""" + user = _make_user(permissions=["library:read"]) + assert not has_permission(user, Permission.LIBRARY_WRITE) + assert not has_permission(user, Permission.CONFIG_WRITE) + + def test_multiple_required_all_must_match(self) -> None: + """All required permissions must be present for the check to pass.""" + user = _make_user(permissions=["library:read", "library:write"]) + assert has_permission(user, Permission.LIBRARY_READ, Permission.LIBRARY_WRITE) + assert not has_permission(user, Permission.LIBRARY_READ, Permission.LIBRARY_DELETE) + + def test_fallback_to_role_when_no_permissions(self) -> None: + """When no permissions are set, should fall back to role-based generation.""" + user = _make_user(UserRole.ADMIN) + # No explicit permissions set — should use role to generate (admin = wildcard) + assert has_permission(user, Permission.CONFIG_WRITE) + + def test_fallback_for_regular_user(self) -> None: + """Regular user with no explicit permissions should get role defaults.""" + user = _make_user(UserRole.USER) + assert has_permission(user, Permission.LIBRARY_READ) + assert not has_permission(user, Permission.LIBRARY_WRITE) + + def test_empty_permissions_list_triggers_fallback(self) -> None: + """An empty permissions list should trigger role-based fallback.""" + user = _make_user(UserRole.ADMIN, permissions=[]) + assert has_permission(user, Permission.CONFIG_WRITE) + + +class TestGetUserClaim: + """Tests for JWT claim extraction.""" + + def test_existing_claim(self) -> None: + """Should return claim value when present.""" + user = _make_user(claims={"spotify:premium": True, "tidal:tier": "hifi"}) + assert get_user_claim(user, "spotify:premium") is True + assert get_user_claim(user, "tidal:tier") == "hifi" + + def test_missing_claim_returns_default(self) -> None: + """Should return default when claim is not present.""" + user = _make_user(claims={}) + assert get_user_claim(user, "nonexistent") is None + assert get_user_claim(user, "nonexistent", "fallback") == "fallback" + + def test_no_claims_attribute(self) -> None: + """Should handle user with no claims attribute gracefully.""" + user = _make_user() + assert get_user_claim(user, "anything") is None