diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index 81bd683cb8..37e72bdad2 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -33,7 +33,7 @@ ProviderUnavailableError, ) from music_assistant_models.media_items import AudioFormat -from music_assistant_models.streamdetails import MultiPartPath +from music_assistant_models.streamdetails import MultiPartPath, StreamMirror from music_assistant.constants import ( CONF_ENTRY_OUTPUT_LIMITER, @@ -553,9 +553,33 @@ async def get_media_stream( assert streamdetails.decryption_key is not None # for type checking extra_input_args += ["-decryption_key", streamdetails.decryption_key] if isinstance(streamdetails.path, list): - # multi part stream - audio_source = get_multi_file_stream(mass, streamdetails, seek_position) - seek_position = 0 # handled by get_multi_file_stream + logger.debug( + "media_stream path list len=%s first_type=%s", + len(streamdetails.path), + type(streamdetails.path[0]).__name__ if streamdetails.path else None, + ) + if all(isinstance(part, MultiPartPath) for part in streamdetails.path): + # multi part stream + audio_source = get_multi_file_stream(mass, streamdetails, seek_position) + seek_position = 0 # handled by get_multi_file_stream + elif all(isinstance(part, StreamMirror) for part in streamdetails.path): + # mirror URLs + LOGGER.debug( + "Using mirror stream list (count=%s) for %s", + len(streamdetails.path), + streamdetails.uri, + ) + audio_source = get_mirror_stream(mass, streamdetails) + seek_position = 0 # no seeking in a radio stream + else: + logger.error( + "Unsupported path list contents for %s: %s", + streamdetails.uri, + [type(p).__name__ for p in streamdetails.path], + ) + raise InvalidDataError( + "Streamdetails.path must be list of MultiPartPath or StreamMirror" + ) else: # regular single file/url stream assert isinstance(streamdetails.path, str) # for type checking @@ -1155,7 +1179,10 @@ async def get_multi_file_stream( """ if not isinstance(streamdetails.path, list): raise InvalidDataError("Multi-file streamdetails requires a list of MultiPartPath") - parts, seek_position = _get_parts_from_position(streamdetails.path, seek_position) + # type narrowing: we already validated path is a list of MultiPartPath + parts, seek_position = _get_parts_from_position( + cast("list[MultiPartPath]", streamdetails.path), seek_position + ) files_list = [part.path for part in parts] # concat input files @@ -1190,6 +1217,74 @@ async def get_multi_file_stream( await remove_file(temp_file) +async def get_mirror_stream( + mass: MusicAssistant, + streamdetails: StreamDetails, +) -> AsyncGenerator[bytes, None]: + """Return audio stream from mirrored URLs. + + Tries each mirror (ordered by priority, highest first) until one works. + """ + if not isinstance(streamdetails.path, list): + raise InvalidDataError("Mirror streamdetails requires a list of StreamMirror") + + # Validate and normalize mirrors + mirrors: list[StreamMirror] = [] + for m in streamdetails.path: + if not isinstance(m, StreamMirror): + raise InvalidDataError("Mirror streamdetails requires a list of StreamMirror") + if not isinstance(m.path, str): + raise InvalidDataError("StreamMirror.path must be a URL string") + mirrors.append(m) + + # Try mirrors sorted by priority (higher priority first). Sort is stable so + # mirrors with equal priority keep their original order. + mirrors.sort(key=lambda x: x.priority or 0, reverse=True) + + LOGGER.debug( + "Mirror stream start for %s with %s mirror(s)", + streamdetails.uri, + len(mirrors), + ) + + last_exception: Exception | None = None + for mirror in mirrors: + LOGGER.debug( + "Trying mirror %s (priority=%s) for %s", + mirror.path, + mirror.priority, + streamdetails.uri, + ) + try: + async for chunk in get_http_stream( + mass, + mirror.path, + streamdetails, + # There's no verify_ssl on StreamMirror; default to True. + verify_ssl=True, + ): + yield chunk + return + except Exception as err: + LOGGER.warning( + "Error streaming from mirror %s (priority=%s) for %s: %s", + mirror.path, + mirror.priority, + streamdetails.uri, + err, + exc_info=err, + ) + last_exception = err + + LOGGER.error( + "All mirror streams failed for %s (attempted %s)", + streamdetails.uri, + [m.path for m in mirrors], + exc_info=last_exception, + ) + raise AudioError("All mirror streams failed") from last_exception + + async def get_preview_stream( mass: MusicAssistant, provider_instance_id_or_domain: str, diff --git a/music_assistant/helpers/json.py b/music_assistant/helpers/json.py index f1aa4e9642..fc8e4da32b 100644 --- a/music_assistant/helpers/json.py +++ b/music_assistant/helpers/json.py @@ -2,7 +2,9 @@ import asyncio import base64 +import logging from _collections_abc import dict_keys, dict_values +from dataclasses import asdict, is_dataclass from types import MethodType from typing import Any, TypeVar @@ -10,6 +12,8 @@ import orjson from mashumaro.mixins.orjson import DataClassORJSONMixin +LOGGER = logging.getLogger(__name__) + JSON_ENCODE_EXCEPTIONS = (TypeError, ValueError) JSON_DECODE_EXCEPTIONS = (orjson.JSONDecodeError,) @@ -17,16 +21,44 @@ def get_serializable_value(obj: Any, raise_unhandled: bool = False) -> Any: - """Parse the value to its serializable equivalent.""" + """Parse the value to its serializable equivalent. + + This function will convert dataclasses, dicts and iterable containers into + JSON-serializable primitives. It intentionally returns primitives (lists, + dicts, strings, numbers) for any complex object that cannot be serialized + directly by orjson to avoid infinite recursion in the default serializer. + """ if getattr(obj, "do_not_serialize", None): return None + + # Convert dataclass *instances* to dicts so nested dataclasses get handled recursively. + # `is_dataclass` can also return True for dataclass *types*, so ensure we only + # pass instances to `asdict` to avoid type errors (mypy). + if is_dataclass(obj) and not isinstance(obj, type): + return get_serializable_value(asdict(obj)) + + # Handle plain dicts + if isinstance(obj, dict): + return {k: get_serializable_value(v) for k, v in obj.items()} + + # Handle iterable containers if ( isinstance(obj, list | set | filter | tuple | dict_values | dict_keys | dict_values) or obj.__class__ == "dict_valueiterator" ): return [get_serializable_value(x) for x in obj] + + # If an object provides an explicit to_dict use that if hasattr(obj, "to_dict"): return obj.to_dict() + + # Fallback to to_json if available + if hasattr(obj, "to_json"): + try: + return obj.to_json() + except Exception as exc: # pragma: no cover - defensive logging + LOGGER.debug("Failed to use to_json() for %s: %s", type(obj), exc) + if isinstance(obj, bytes): return base64.b64encode(obj).decode("ascii") if isinstance(obj, DO_NOT_SERIALIZE_TYPES): diff --git a/music_assistant/providers/digitally_incorporated/__init__.py b/music_assistant/providers/digitally_incorporated/__init__.py index 21166db21e..afb3170f18 100644 --- a/music_assistant/providers/digitally_incorporated/__init__.py +++ b/music_assistant/providers/digitally_incorporated/__init__.py @@ -38,7 +38,7 @@ SearchResults, UniqueList, ) -from music_assistant_models.streamdetails import StreamDetails +from music_assistant_models.streamdetails import StreamDetails, StreamMirror from music_assistant.controllers.cache import use_cache from music_assistant.helpers.throttle_retry import Throttler @@ -72,7 +72,7 @@ API_TIMEOUT = 30 CACHE_CHANNELS = 86400 # 24 hours CACHE_GENRES = 86400 # 24 hours -CACHE_STREAM_URL = 3600 # 1 hour +CACHE_STREAM_URLS = 3600 # 1 hour # Rate limiting RATE_LIMIT = 2 # requests per period @@ -334,7 +334,7 @@ async def get_stream_details(self, item_id: str, media_type: MediaType) -> Strea network_key, channel_key = self._validate_item_id(item_id) # Get the stream URL - stream_url = await self._get_stream_url(network_key, channel_key) + stream_url = await self._get_stream_urls(network_key, channel_key) return StreamDetails( provider=self.instance_id, @@ -343,7 +343,8 @@ async def get_stream_details(self, item_id: str, media_type: MediaType) -> Strea content_type=ContentType.UNKNOWN, # Let ffmpeg auto-detect ), media_type=MediaType.RADIO, - stream_type=StreamType.ICY, + # Use HTTP stream type with mirrors so we can try multiple URLs + stream_type=StreamType.HTTP, path=stream_url, allow_seek=False, can_seek=False, @@ -606,9 +607,9 @@ async def _get_genre_mapping(self, network_key: str) -> dict[int, str]: ) return {} - @use_cache(CACHE_STREAM_URL) - async def _get_stream_url(self, network_key: str, channel_key: str) -> str: - """Get the streaming URL for a channel.""" + @use_cache(CACHE_STREAM_URLS) + async def _get_stream_urls(self, network_key: str, channel_key: str) -> list[StreamMirror]: + """Get the streaming URLs for a channel.""" self.logger.debug("%s: Getting stream URL for %s:%s", self.domain, network_key, channel_key) listen_key = self.config.get_value("listen_key") @@ -622,28 +623,29 @@ async def _get_stream_url(self, network_key: str, channel_key: str) -> str: network_key, f"listen/premium_high/{channel_key}", use_https=True, **params ) - # Use the first stream URL from the playlist - self.logger.debug( - "%s: Digitally Incorporated playlist returned %d URLs", self.domain, len(playlist) - ) if not playlist or not isinstance(playlist, list): msg = f"{self.domain}: No stream URLs returned from Digitally Incorporated API" raise MediaNotFoundError(msg) - # Log all available URLs for debugging - for i, url in enumerate(playlist): - self.logger.debug("%s: Available stream URL %d: %s", self.domain, i + 1, url) + # Represent returned URLs as StreamMirror objects (explicit mirror semantics) + stream_list = [StreamMirror(url) for url in playlist if url and isinstance(url, str)] - # Use the first URL - Digitally Incorporated typically returns them in priority order - stream_url: str = str(playlist[0]) - self.logger.debug("%s: Selected stream URL: %s", self.domain, stream_url) + self.logger.debug( + "%s: Filtered %d valid stream URLs from playlist of %d URLs", + self.domain, + len(stream_list), + len(playlist), + ) - # Validate the stream URL - if not stream_url or not isinstance(stream_url, str): - msg = f"{self.domain}: Invalid stream URL received: {stream_url}" + if not stream_list: + msg = f"{self.domain}: No valid stream URLs found in the playlist" raise MediaNotFoundError(msg) - return stream_url + # Log all available URLs + for i, url in enumerate(stream_list): + self.logger.debug("%s: Available stream URL %d: %s", self.domain, i + 1, url) + + return stream_list except (ProviderUnavailableError, MediaNotFoundError): # Re-raise provider/media errors as-is (they already have domain prefix) diff --git a/tests/test_json_serialization.py b/tests/test_json_serialization.py new file mode 100644 index 0000000000..b1ab02332a --- /dev/null +++ b/tests/test_json_serialization.py @@ -0,0 +1,13 @@ +"""Tests for JSON serialization helpers.""" + +from music_assistant_models.streamdetails import MultiPartPath + +from music_assistant.helpers.json import json_dumps + + +def test_multipartpath_list_serializes() -> None: + """Ensure a list of MultiPartPath serializes to JSON.""" + data = [MultiPartPath("http://a"), MultiPartPath("b", 12.3)] + json_str = json_dumps(data) + assert "http://a" in json_str + assert "12.3" in json_str