Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 100 additions & 5 deletions music_assistant/helpers/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
ProviderUnavailableError,
)
from music_assistant_models.media_items import AudioFormat
from music_assistant_models.streamdetails import MultiPartPath
from music_assistant_models.streamdetails import MultiPartPath, StreamMirror

from music_assistant.constants import (
CONF_ENTRY_OUTPUT_LIMITER,
Expand Down Expand Up @@ -553,9 +553,33 @@ async def get_media_stream(
assert streamdetails.decryption_key is not None # for type checking
extra_input_args += ["-decryption_key", streamdetails.decryption_key]
if isinstance(streamdetails.path, list):
# multi part stream
audio_source = get_multi_file_stream(mass, streamdetails, seek_position)
seek_position = 0 # handled by get_multi_file_stream
logger.debug(
"media_stream path list len=%s first_type=%s",
len(streamdetails.path),
type(streamdetails.path[0]).__name__ if streamdetails.path else None,
)
if all(isinstance(part, MultiPartPath) for part in streamdetails.path):
# multi part stream
audio_source = get_multi_file_stream(mass, streamdetails, seek_position)
seek_position = 0 # handled by get_multi_file_stream
elif all(isinstance(part, StreamMirror) for part in streamdetails.path):
# mirror URLs
LOGGER.debug(
"Using mirror stream list (count=%s) for %s",
len(streamdetails.path),
streamdetails.uri,
)
audio_source = get_mirror_stream(mass, streamdetails)
seek_position = 0 # no seeking in a radio stream
else:
logger.error(
"Unsupported path list contents for %s: %s",
streamdetails.uri,
[type(p).__name__ for p in streamdetails.path],
)
raise InvalidDataError(
"Streamdetails.path must be list of MultiPartPath or StreamMirror"
)
else:
# regular single file/url stream
assert isinstance(streamdetails.path, str) # for type checking
Expand Down Expand Up @@ -1156,7 +1180,10 @@ async def get_multi_file_stream(
"""
if not isinstance(streamdetails.path, list):
raise InvalidDataError("Multi-file streamdetails requires a list of MultiPartPath")
parts, seek_position = _get_parts_from_position(streamdetails.path, seek_position)
# type narrowing: we already validated path is a list of MultiPartPath
parts, seek_position = _get_parts_from_position(
cast("list[MultiPartPath]", streamdetails.path), seek_position
)
files_list = [part.path for part in parts]

# concat input files
Expand Down Expand Up @@ -1191,6 +1218,74 @@ async def get_multi_file_stream(
await remove_file(temp_file)


async def get_mirror_stream(
mass: MusicAssistant,
streamdetails: StreamDetails,
) -> AsyncGenerator[bytes, None]:
"""Return audio stream from mirrored URLs.

Tries each mirror (ordered by priority, highest first) until one works.
"""
if not isinstance(streamdetails.path, list):
raise InvalidDataError("Mirror streamdetails requires a list of StreamMirror")

# Validate and normalize mirrors
mirrors: list[StreamMirror] = []
for m in streamdetails.path:
if not isinstance(m, StreamMirror):
raise InvalidDataError("Mirror streamdetails requires a list of StreamMirror")
if not isinstance(m.path, str):
raise InvalidDataError("StreamMirror.path must be a URL string")
mirrors.append(m)

# Try mirrors sorted by priority (higher priority first). Sort is stable so
# mirrors with equal priority keep their original order.
mirrors.sort(key=lambda x: x.priority or 0, reverse=True)

LOGGER.debug(
"Mirror stream start for %s with %s mirror(s)",
streamdetails.uri,
len(mirrors),
)

last_exception: Exception | None = None
for mirror in mirrors:
LOGGER.debug(
"Trying mirror %s (priority=%s) for %s",
mirror.path,
mirror.priority,
streamdetails.uri,
)
try:
async for chunk in get_http_stream(
mass,
mirror.path,
streamdetails,
# There's no verify_ssl on StreamMirror; default to True.
verify_ssl=True,
):
yield chunk
return
except Exception as err:
LOGGER.warning(
"Error streaming from mirror %s (priority=%s) for %s: %s",
mirror.path,
mirror.priority,
streamdetails.uri,
err,
exc_info=err,
)
last_exception = err

LOGGER.error(
"All mirror streams failed for %s (attempted %s)",
streamdetails.uri,
[m.path for m in mirrors],
exc_info=last_exception,
)
raise AudioError("All mirror streams failed") from last_exception


async def get_preview_stream(
mass: MusicAssistant,
provider_instance_id_or_domain: str,
Expand Down
34 changes: 33 additions & 1 deletion music_assistant/helpers/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,63 @@

import asyncio
import base64
import logging
from _collections_abc import dict_keys, dict_values
from dataclasses import asdict, is_dataclass
from types import MethodType
from typing import Any, TypeVar

import aiofiles
import orjson
from mashumaro.mixins.orjson import DataClassORJSONMixin

LOGGER = logging.getLogger(__name__)

JSON_ENCODE_EXCEPTIONS = (TypeError, ValueError)
JSON_DECODE_EXCEPTIONS = (orjson.JSONDecodeError,)

DO_NOT_SERIALIZE_TYPES = (MethodType, asyncio.Task)


def get_serializable_value(obj: Any, raise_unhandled: bool = False) -> Any:
"""Parse the value to its serializable equivalent."""
"""Parse the value to its serializable equivalent.

This function will convert dataclasses, dicts and iterable containers into
JSON-serializable primitives. It intentionally returns primitives (lists,
dicts, strings, numbers) for any complex object that cannot be serialized
directly by orjson to avoid infinite recursion in the default serializer.
"""
if getattr(obj, "do_not_serialize", None):
return None

# Convert dataclass *instances* to dicts so nested dataclasses get handled recursively.
# `is_dataclass` can also return True for dataclass *types*, so ensure we only
# pass instances to `asdict` to avoid type errors (mypy).
if is_dataclass(obj) and not isinstance(obj, type):
return get_serializable_value(asdict(obj))

# Handle plain dicts
if isinstance(obj, dict):
return {k: get_serializable_value(v) for k, v in obj.items()}

# Handle iterable containers
if (
isinstance(obj, list | set | filter | tuple | dict_values | dict_keys | dict_values)
or obj.__class__ == "dict_valueiterator"
Comment on lines 46 to 47
Copy link

Copilot AI Jan 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is ineffective: obj.__class__ is a type, so comparing it to the string "dict_valueiterator" will always be False. If you need to special-case dict value iterators, compare type(obj).__name__ (or use an Iterator/Iterable check) instead. Also, dict_values is listed twice in the isinstance union.

Suggested change
isinstance(obj, list | set | filter | tuple | dict_values | dict_keys | dict_values)
or obj.__class__ == "dict_valueiterator"
isinstance(obj, list | set | filter | tuple | dict_values | dict_keys)
or type(obj).__name__ == "dict_valueiterator"

Copilot uses AI. Check for mistakes.
):
return [get_serializable_value(x) for x in obj]

# If an object provides an explicit to_dict use that
if hasattr(obj, "to_dict"):
return obj.to_dict()

# Fallback to to_json if available
if hasattr(obj, "to_json"):
try:
return obj.to_json()
except Exception as exc: # pragma: no cover - defensive logging
LOGGER.debug("Failed to use to_json() for %s: %s", type(obj), exc)

if isinstance(obj, bytes):
return base64.b64encode(obj).decode("ascii")
if isinstance(obj, DO_NOT_SERIALIZE_TYPES):
Expand Down
44 changes: 23 additions & 21 deletions music_assistant/providers/digitally_incorporated/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
SearchResults,
UniqueList,
)
from music_assistant_models.streamdetails import StreamDetails
from music_assistant_models.streamdetails import StreamDetails, StreamMirror

from music_assistant.controllers.cache import use_cache
from music_assistant.helpers.throttle_retry import Throttler
Expand Down Expand Up @@ -72,7 +72,7 @@
API_TIMEOUT = 30
CACHE_CHANNELS = 86400 # 24 hours
CACHE_GENRES = 86400 # 24 hours
CACHE_STREAM_URL = 3600 # 1 hour
CACHE_STREAM_URLS = 3600 # 1 hour

# Rate limiting
RATE_LIMIT = 2 # requests per period
Expand Down Expand Up @@ -334,7 +334,7 @@ async def get_stream_details(self, item_id: str, media_type: MediaType) -> Strea
network_key, channel_key = self._validate_item_id(item_id)

# Get the stream URL
stream_url = await self._get_stream_url(network_key, channel_key)
stream_url = await self._get_stream_urls(network_key, channel_key)

Comment on lines 336 to 338
Copy link

Copilot AI Jan 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream_url now holds a list of StreamMirror entries (multiple URLs). Renaming to something like stream_urls/mirrors would make the type/intent clearer and avoid confusion for future readers.

Copilot uses AI. Check for mistakes.
return StreamDetails(
provider=self.instance_id,
Expand All @@ -343,7 +343,8 @@ async def get_stream_details(self, item_id: str, media_type: MediaType) -> Strea
content_type=ContentType.UNKNOWN, # Let ffmpeg auto-detect
),
media_type=MediaType.RADIO,
stream_type=StreamType.ICY,
# Use HTTP stream type with mirrors so we can try multiple URLs
stream_type=StreamType.HTTP,
path=stream_url,
Comment on lines 345 to 348
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the Digitally Incorporated radio stream from StreamType.ICY to StreamType.HTTP will bypass the ICY-specific streaming path (get_icy_radio_stream) and likely disables ICY metadata (stream title) updates. If these streams provide ICY metadata, consider keeping StreamType.ICY and adding mirror failover support for ICY streams (or enhancing the ICY streamer to accept a mirror list) instead of downgrading to plain HTTP.

Copilot uses AI. Check for mistakes.
allow_seek=False,
can_seek=False,
Expand Down Expand Up @@ -606,9 +607,9 @@ async def _get_genre_mapping(self, network_key: str) -> dict[int, str]:
)
return {}

@use_cache(CACHE_STREAM_URL)
async def _get_stream_url(self, network_key: str, channel_key: str) -> str:
"""Get the streaming URL for a channel."""
@use_cache(CACHE_STREAM_URLS)
async def _get_stream_urls(self, network_key: str, channel_key: str) -> list[StreamMirror]:
"""Get the streaming URLs for a channel."""
self.logger.debug("%s: Getting stream URL for %s:%s", self.domain, network_key, channel_key)
Copy link

Copilot AI Jan 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This debug line still says "Getting stream URL" even though the method now returns multiple URLs. Consider updating the message to "stream URLs" to match the new behavior and avoid confusion when troubleshooting.

Copilot uses AI. Check for mistakes.

listen_key = self.config.get_value("listen_key")
Expand All @@ -622,28 +623,29 @@ async def _get_stream_url(self, network_key: str, channel_key: str) -> str:
network_key, f"listen/premium_high/{channel_key}", use_https=True, **params
)

# Use the first stream URL from the playlist
self.logger.debug(
"%s: Digitally Incorporated playlist returned %d URLs", self.domain, len(playlist)
)
if not playlist or not isinstance(playlist, list):
msg = f"{self.domain}: No stream URLs returned from Digitally Incorporated API"
raise MediaNotFoundError(msg)

# Log all available URLs for debugging
for i, url in enumerate(playlist):
self.logger.debug("%s: Available stream URL %d: %s", self.domain, i + 1, url)
# Represent returned URLs as StreamMirror objects (explicit mirror semantics)
stream_list = [StreamMirror(url) for url in playlist if url and isinstance(url, str)]

# Use the first URL - Digitally Incorporated typically returns them in priority order
stream_url: str = str(playlist[0])
self.logger.debug("%s: Selected stream URL: %s", self.domain, stream_url)
self.logger.debug(
"%s: Filtered %d valid stream URLs from playlist of %d URLs",
self.domain,
len(stream_list),
len(playlist),
)

# Validate the stream URL
if not stream_url or not isinstance(stream_url, str):
msg = f"{self.domain}: Invalid stream URL received: {stream_url}"
if not stream_list:
msg = f"{self.domain}: No valid stream URLs found in the playlist"
raise MediaNotFoundError(msg)

return stream_url
# Log all available URLs
for i, url in enumerate(stream_list):
self.logger.debug("%s: Available stream URL %d: %s", self.domain, i + 1, url)
Copy link

Copilot AI Jan 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream_list contains StreamMirror objects, but the debug log prints the object (%s, url) rather than the actual URL string. This makes logs harder to read and can omit the URL if StreamMirror.__str__ isn’t implemented. Log url.path (and optionally url.priority) instead.

Suggested change
self.logger.debug("%s: Available stream URL %d: %s", self.domain, i + 1, url)
self.logger.debug(
"%s: Available stream URL %d: %s (priority: %s)",
self.domain,
i + 1,
url.path,
url.priority,
)

Copilot uses AI. Check for mistakes.

return stream_list

except (ProviderUnavailableError, MediaNotFoundError):
# Re-raise provider/media errors as-is (they already have domain prefix)
Expand Down
13 changes: 13 additions & 0 deletions tests/test_json_serialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""Tests for JSON serialization helpers."""

from music_assistant_models.streamdetails import MultiPartPath

from music_assistant.helpers.json import json_dumps


def test_multipartpath_list_serializes() -> None:
"""Ensure a list of MultiPartPath serializes to JSON."""
data = [MultiPartPath("http://a"), MultiPartPath("b", 12.3)]
json_str = json_dumps(data)
assert "http://a" in json_str
assert "12.3" in json_str
Loading