Skip to content
134 changes: 129 additions & 5 deletions music_assistant/helpers/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from typing import TYPE_CHECKING, Any, Final, cast

import aiofiles
import aiohttp
import shortuuid
from aiohttp import ClientConnectorSSLError, ClientTimeout
from music_assistant_models.dsp import DSPConfig, DSPDetails, DSPState
Expand All @@ -31,10 +32,12 @@
InvalidDataError,
MediaNotFoundError,
MusicAssistantError,
ProviderPermissionDenied,
ProviderUnavailableError,
RetriesExhausted,
)
from music_assistant_models.media_items import AudioFormat
from music_assistant_models.streamdetails import MultiPartPath
from music_assistant_models.streamdetails import MultiPartPath, StreamMetadata

from music_assistant.constants import (
CONF_ENTRY_OUTPUT_LIMITER,
Expand All @@ -56,6 +59,9 @@
from .audio_buffer import AudioBuffer
from .dsp import filter_to_ffmpeg_params
from .ffmpeg import FFMpeg, get_ffmpeg_args, get_ffmpeg_stream
from .ogg_handler import (
get_chained_ogg_stream,
)
from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
from .process import AsyncProcess, communicate
from .util import detect_charset
Expand Down Expand Up @@ -575,6 +581,44 @@ async def get_media_stream(
assert isinstance(streamdetails.path, str) # for type checking
audio_source = get_icy_radio_stream(mass, streamdetails.path, streamdetails)
seek_position = 0 # seeking not possible on radio streams
elif stream_type == StreamType.IN_BAND:
assert isinstance(streamdetails.path, str) # for type checking

# For IN_BAND (OGG/Opus) radio streams, use chained OGG handler.
# This handles the chained OGG format by stitching logical bitstreams together
# so FFmpeg sees a single continuous stream. Metadata is extracted in-band.
def _on_inband_metadata(metadata: dict[str, str]) -> None:
"""Handle metadata extracted from the OGG stream."""
title = metadata.get("title", "")
artist = metadata.get("artist", "")
album = metadata.get("album", "")
if not artist and " - " in title:
artist, title = title.split(" - ", 1)
if title or artist:
if artist and title:
stream_title = f"{artist} - {title}"
elif title:
stream_title = title
else:
stream_title = artist
cleaned_title = clean_stream_title(stream_title)
if cleaned_title and cleaned_title != streamdetails.stream_title:
LOGGER.log(
VERBOSE_LOG_LEVEL,
"In-band metadata: %s",
cleaned_title,
)
streamdetails.stream_title = cleaned_title
streamdetails.stream_metadata = StreamMetadata(
title=title or cleaned_title,
artist=artist or None,
album=album or None,
)

audio_source = get_chained_ogg_stream(
mass, streamdetails.path, metadata_callback=_on_inband_metadata
)
seek_position = 0 # seeking not possible on radio streams
elif stream_type == StreamType.HLS:
assert isinstance(streamdetails.path, str) # for type checking
substream = await get_hls_substream(mass, streamdetails.path)
Expand Down Expand Up @@ -606,14 +650,15 @@ async def get_media_stream(
finished = False
cancelled = False
first_chunk_received = False
ffmpeg_loglevel = "debug" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "info"
ffmpeg_proc = FFMpeg(
audio_input=audio_source,
input_format=streamdetails.audio_format,
output_format=pcm_format,
filter_params=filter_params,
extra_input_args=extra_input_args,
collect_log_history=True,
loglevel="debug" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "info",
loglevel=ffmpeg_loglevel,
)

try:
Expand Down Expand Up @@ -644,9 +689,14 @@ async def get_media_stream(
bytes_sent += len(chunk)

# end of audio/track reached
logger.log(VERBOSE_LOG_LEVEL, "End of stream reached.")
logger.debug("End of ffmpeg output stream reached for %s", streamdetails.uri)
# wait until stderr also completed reading
await ffmpeg_proc.wait_with_timeout(5)
logger.debug(
"FFmpeg process ended with return code %s for %s",
ffmpeg_proc.returncode,
streamdetails.uri,
)
if ffmpeg_proc.returncode not in (0, None):
log_trail = "\n".join(list(ffmpeg_proc.log_history)[-5:])
raise AudioError(f"FFMpeg exited with code {ffmpeg_proc.returncode}: {log_trail}")
Expand Down Expand Up @@ -769,6 +819,7 @@ async def _connect_radio_stream(

Some radio servers use outdated TLS configurations that reject modern cipher suites.
Since radio streams are public broadcast content, relaxing cipher requirements is acceptable.

:param mass: The MusicAssistant instance.
:param url: The radio stream URL to connect to.
:param kwargs: Additional keyword arguments passed to aiohttp get().
Expand All @@ -791,11 +842,11 @@ async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, Str
Resolve a streaming radio URL.

Unwraps any playlists if needed.
Determines if the stream supports ICY metadata.
Determines if the stream supports ICY metadata or in-band metadata.

Returns tuple;
- unfolded URL as string
- StreamType to determine ICY (radio) or HLS stream.
- StreamType to determine ICY (radio), HLS, or IN_BAND stream.
"""
if cache := await mass.cache.get(
key=url, provider=CACHE_PROVIDER, category=CACHE_CATEGORY_RESOLVED_RADIO_URL
Expand All @@ -814,8 +865,12 @@ async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, Str
resp.raise_for_status()
if not resp.headers:
raise InvalidDataError("no headers found")
content_type = headers.get("content-type", "")
if headers.get("icy-metaint") is not None:
stream_type = StreamType.ICY
elif content_type in ("application/ogg", "audio/ogg"):
# Ogg streams (Opus/Vorbis) have in-band metadata via Vorbis comments
stream_type = StreamType.IN_BAND
if (
url.endswith((".m3u", ".m3u8", ".pls"))
or ".m3u?" in url
Expand Down Expand Up @@ -901,6 +956,71 @@ async def get_icy_radio_stream(
streamdetails.stream_title = cleaned_stream_title


async def get_reconnecting_radio_stream(
mass: MusicAssistant, url: str
) -> AsyncGenerator[bytes, None]:
"""Yield continuous radio stream data, automatically reconnecting on disconnect.

:param mass: MusicAssistant instance.
:param url: URL of the radio stream.
"""
timeout = ClientTimeout(total=None, connect=30, sock_read=5 * 60)
reconnect_count = 0
max_reconnects = 1000 # Allow many reconnects for long-running radio

while reconnect_count <= max_reconnects:
try:
async with _connect_radio_stream(
mass, url, allow_redirects=True, headers=HTTP_HEADERS, timeout=timeout
) as resp:
chunk_count = 0
async for chunk in resp.content.iter_any():
chunk_count += 1
yield chunk

# Connection closed normally - reconnect
LOGGER.debug(
"Radio stream connection closed after %d chunks, reconnecting... "
"(reconnect #%d)",
chunk_count,
reconnect_count,
)
reconnect_count += 1
await asyncio.sleep(0.1) # Brief delay before reconnect

except asyncio.CancelledError:
LOGGER.debug("Radio stream cancelled for %s", url)
raise
except (
aiohttp.ClientConnectionError,
aiohttp.ClientPayloadError,
aiohttp.ServerDisconnectedError,
) as err:
# Transient network errors - retry
LOGGER.warning(
"Radio stream error (reconnect #%d): %s",
reconnect_count,
err,
)
reconnect_count += 1
if reconnect_count > max_reconnects:
raise RetriesExhausted(
f"Radio stream failed after {max_reconnects} reconnects: {err}"
) from err
await asyncio.sleep(0.5)
except aiohttp.ClientResponseError as err:
if err.status == 404:
raise MediaNotFoundError(f"Radio stream not found: {url}") from err
if err.status == 403:
raise ProviderPermissionDenied(f"Radio stream access denied: {url}") from err
# Other HTTP errors (5xx etc) - could be temporary
raise ProviderUnavailableError(
f"Radio stream returned HTTP {err.status}: {err}"
) from err

LOGGER.warning("Radio stream reached max reconnects (%d) for %s", max_reconnects, url)


def parse_extinf_metadata(extinf_line: str) -> dict[str, str]:
"""
Parse metadata from HLS EXTINF line.
Expand Down Expand Up @@ -1569,6 +1689,10 @@ async def analyze_loudness(
elif stream_type == StreamType.ICY:
assert isinstance(streamdetails.path, str) # for type checking
audio_source = get_icy_radio_stream(mass, streamdetails.path, streamdetails)
elif stream_type == StreamType.IN_BAND:
assert isinstance(streamdetails.path, str) # for type checking
# Use chained OGG handler for seamless playback across logical bitstreams
audio_source = get_chained_ogg_stream(mass, streamdetails.path, metadata_callback=None)
elif stream_type == StreamType.HLS:
assert isinstance(streamdetails.path, str) # for type checking
substream = await get_hls_substream(mass, streamdetails.path)
Expand Down
4 changes: 4 additions & 0 deletions music_assistant/helpers/ffmpeg.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ async def _log_reader_task(self) -> None:
if decode_errors >= 50:
self.logger.error(line)

# Log reconnection events for radio streams
if "Opening" in line or "Reconnect" in line or "reconnect" in line:
self.logger.debug("FFmpeg: %s", line)

if "Error during demuxing" in line:
# this can occur if using the concat demuxer for multipart files
# and should raise an exception to prevent false progress logging
Expand Down
Loading
Loading