Skip to content
180 changes: 173 additions & 7 deletions music_assistant/helpers/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import aiofiles
import shortuuid
from aiohttp import ClientConnectorSSLError, ClientTimeout
from aiohttp import ClientConnectorSSLError, ClientError, ClientTimeout
from music_assistant_models.dsp import DSPConfig, DSPDetails, DSPState
from music_assistant_models.enums import (
ContentType,
Expand All @@ -34,7 +34,7 @@
ProviderUnavailableError,
)
from music_assistant_models.media_items import AudioFormat
from music_assistant_models.streamdetails import MultiPartPath
from music_assistant_models.streamdetails import MultiPartPath, StreamMetadata

from music_assistant.constants import (
CONF_ENTRY_OUTPUT_LIMITER,
Expand All @@ -56,6 +56,11 @@
from .audio_buffer import AudioBuffer
from .dsp import filter_to_ffmpeg_params
from .ffmpeg import FFMpeg, get_ffmpeg_args, get_ffmpeg_stream
from .ogg_handler import (
extract_metadata_from_page,
get_chained_ogg_stream,
parse_ogg_page,
)
from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
from .process import AsyncProcess, communicate
from .util import detect_charset
Expand Down Expand Up @@ -348,12 +353,17 @@ async def get_stream_details(
resolved_url, stream_type = await resolve_radio_stream(mass, streamdetails.path)
streamdetails.path = resolved_url
streamdetails.stream_type = stream_type
# Set up metadata monitoring callback for HLS radio streams
# Set up metadata monitoring callback for radio streams
if stream_type == StreamType.HLS:
streamdetails.stream_metadata_update_callback = partial(
_update_hls_radio_metadata, mass
)
streamdetails.stream_metadata_update_interval = 5
elif stream_type == StreamType.IN_BAND:
streamdetails.stream_metadata_update_callback = partial(
_update_inband_radio_metadata, mass
)
streamdetails.stream_metadata_update_interval = 10
# handle volume normalization details
if result := await mass.music.get_loudness(
streamdetails.item_id,
Expand Down Expand Up @@ -573,6 +583,13 @@ async def get_media_stream(
assert isinstance(streamdetails.path, str) # for type checking
audio_source = get_icy_radio_stream(mass, streamdetails.path, streamdetails)
seek_position = 0 # seeking not possible on radio streams
elif stream_type == StreamType.IN_BAND:
assert isinstance(streamdetails.path, str) # for type checking
# For IN_BAND (OGG/Opus) radio streams, use chained OGG handler.
# This handles the chained OGG format by stitching logical bitstreams together
# so FFmpeg sees a single continuous stream. Metadata is extracted in-band.
audio_source = get_chained_ogg_stream(mass, streamdetails.path)
seek_position = 0 # seeking not possible on radio streams
elif stream_type == StreamType.HLS:
assert isinstance(streamdetails.path, str) # for type checking
substream = await get_hls_substream(mass, streamdetails.path)
Expand Down Expand Up @@ -604,14 +621,15 @@ async def get_media_stream(
finished = False
cancelled = False
first_chunk_received = False
ffmpeg_loglevel = "debug" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "info"
ffmpeg_proc = FFMpeg(
audio_input=audio_source,
input_format=streamdetails.audio_format,
output_format=pcm_format,
filter_params=filter_params,
extra_input_args=extra_input_args,
collect_log_history=True,
loglevel="debug" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "info",
loglevel=ffmpeg_loglevel,
)

try:
Expand Down Expand Up @@ -642,9 +660,14 @@ async def get_media_stream(
bytes_sent += len(chunk)

# end of audio/track reached
logger.log(VERBOSE_LOG_LEVEL, "End of stream reached.")
logger.debug("End of ffmpeg output stream reached for %s", streamdetails.uri)
# wait until stderr also completed reading
await ffmpeg_proc.wait_with_timeout(5)
logger.debug(
"FFmpeg process ended with return code %s for %s",
ffmpeg_proc.returncode,
streamdetails.uri,
)
if ffmpeg_proc.returncode not in (0, None):
log_trail = "\n".join(list(ffmpeg_proc.log_history)[-5:])
raise AudioError(f"FFMpeg exited with code {ffmpeg_proc.returncode}: {log_trail}")
Expand Down Expand Up @@ -767,6 +790,7 @@ async def _connect_radio_stream(

Some radio servers use outdated TLS configurations that reject modern cipher suites.
Since radio streams are public broadcast content, relaxing cipher requirements is acceptable.

:param mass: The MusicAssistant instance.
:param url: The radio stream URL to connect to.
:param kwargs: Additional keyword arguments passed to aiohttp get().
Expand All @@ -789,11 +813,11 @@ async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, Str
Resolve a streaming radio URL.

Unwraps any playlists if needed.
Determines if the stream supports ICY metadata.
Determines if the stream supports ICY metadata or in-band metadata.

Returns tuple;
- unfolded URL as string
- StreamType to determine ICY (radio) or HLS stream.
- StreamType to determine ICY (radio), HLS, or IN_BAND stream.
"""
if cache := await mass.cache.get(
key=url, provider=CACHE_PROVIDER, category=CACHE_CATEGORY_RESOLVED_RADIO_URL
Expand All @@ -812,8 +836,12 @@ async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, Str
resp.raise_for_status()
if not resp.headers:
raise InvalidDataError("no headers found")
content_type = headers.get("content-type", "")
if headers.get("icy-metaint") is not None:
stream_type = StreamType.ICY
elif content_type in ("application/ogg", "audio/ogg"):
# Ogg streams (Opus/Vorbis) have in-band metadata via Vorbis comments
stream_type = StreamType.IN_BAND
if (
url.endswith((".m3u", ".m3u8", ".pls"))
or ".m3u?" in url
Expand Down Expand Up @@ -899,6 +927,57 @@ async def get_icy_radio_stream(
streamdetails.stream_title = cleaned_stream_title


async def get_reconnecting_radio_stream(
mass: MusicAssistant, url: str
) -> AsyncGenerator[bytes, None]:
"""Yield continuous radio stream data, automatically reconnecting on disconnect.

:param mass: MusicAssistant instance.
:param url: URL of the radio stream.
"""
timeout = ClientTimeout(total=None, connect=30, sock_read=5 * 60)
reconnect_count = 0
max_reconnects = 1000 # Allow many reconnects for long-running radio

while reconnect_count <= max_reconnects:
try:
async with _connect_radio_stream(
mass, url, allow_redirects=True, headers=HTTP_HEADERS, timeout=timeout
) as resp:
chunk_count = 0
async for chunk in resp.content.iter_any():
chunk_count += 1
yield chunk

# Connection closed normally - reconnect
LOGGER.debug(
"Radio stream connection closed after %d chunks, reconnecting... "
"(reconnect #%d)",
chunk_count,
reconnect_count,
)
reconnect_count += 1
await asyncio.sleep(0.1) # Brief delay before reconnect

except asyncio.CancelledError:
LOGGER.debug("Radio stream cancelled for %s", url)
raise
except ClientError as err:
LOGGER.warning(
"Radio stream error (reconnect #%d): %s",
reconnect_count,
err,
)
reconnect_count += 1
if reconnect_count > max_reconnects:
raise ProviderUnavailableError(
f"Radio stream failed after {max_reconnects} reconnects: {err}"
) from err
await asyncio.sleep(0.5)

LOGGER.warning("Radio stream reached max reconnects (%d) for %s", max_reconnects, url)


def parse_extinf_metadata(extinf_line: str) -> dict[str, str]:
"""
Parse metadata from HLS EXTINF line.
Expand Down Expand Up @@ -1003,6 +1082,89 @@ async def _update_hls_radio_metadata(
)


async def _update_inband_radio_metadata(
mass: MusicAssistant,
streamdetails: StreamDetails,
elapsed_time: int, # noqa: ARG001
) -> None:
"""Sample OGG stream to extract Vorbis comment metadata."""
try:
assert isinstance(streamdetails.path, str) # for type checking
timeout = ClientTimeout(total=5, connect=5, sock_read=5)
station_name = (
streamdetails.stream_metadata.album if streamdetails.stream_metadata else None
)

async with _connect_radio_stream(
mass, streamdetails.path, allow_redirects=True, headers=HTTP_HEADERS, timeout=timeout
) as resp:
if station_name is None:
station_name = resp.headers.get("icy-name")
buffer = b""
bytes_read = 0
max_bytes = 32768 # Read up to 32KB to find metadata

async for chunk in resp.content.iter_any():
buffer += chunk
bytes_read += len(chunk)

# Parse OGG pages looking for metadata
while True:
result = parse_ogg_page(buffer, 0)
if result is None:
break

page, consumed = result
buffer = buffer[consumed:]
metadata = extract_metadata_from_page(page)

if metadata:
title = metadata.get("title", "")
artist = metadata.get("artist", "")
album = metadata.get("album", "") or station_name

if title or artist:
if artist and title:
stream_title = f"{artist} - {title}"
elif title:
stream_title = title
else:
stream_title = artist

cleaned_title = clean_stream_title(stream_title)

if cleaned_title != streamdetails.stream_title and cleaned_title:
LOGGER.log(
VERBOSE_LOG_LEVEL,
"In-band metadata changed: '%s' -> '%s'",
streamdetails.stream_title,
cleaned_title,
)
streamdetails.stream_metadata = StreamMetadata(
title=title or cleaned_title,
artist=artist or None,
album=album or None,
)
LOGGER.log(
VERBOSE_LOG_LEVEL,
"In-band radio metadata: %s (album: %s)",
cleaned_title,
album,
)
streamdetails.stream_title = cleaned_title
# Found metadata, we're done
return

# Stop if we've read enough
if bytes_read >= max_bytes:
break

except asyncio.CancelledError:
raise
except ClientError as err:
LOGGER.debug("Error fetching in-band metadata: %s", err)


async def get_hls_substream(
mass: MusicAssistant,
url: str,
Expand Down Expand Up @@ -1567,6 +1729,10 @@ async def analyze_loudness(
elif stream_type == StreamType.ICY:
assert isinstance(streamdetails.path, str) # for type checking
audio_source = get_icy_radio_stream(mass, streamdetails.path, streamdetails)
elif stream_type == StreamType.IN_BAND:
assert isinstance(streamdetails.path, str) # for type checking
# Use chained OGG handler for seamless playback across logical bitstreams
audio_source = get_chained_ogg_stream(mass, streamdetails.path)
elif stream_type == StreamType.HLS:
assert isinstance(streamdetails.path, str) # for type checking
substream = await get_hls_substream(mass, streamdetails.path)
Expand Down
4 changes: 4 additions & 0 deletions music_assistant/helpers/ffmpeg.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ async def _log_reader_task(self) -> None:
if decode_errors >= 50:
self.logger.error(line)

# Log reconnection events for radio streams
if "Opening" in line or "Reconnect" in line or "reconnect" in line:
self.logger.debug("FFmpeg: %s", line)

if "Error during demuxing" in line:
# this can occur if using the concat demuxer for multipart files
# and should raise an exception to prevent false progress logging
Expand Down
Loading
Loading