Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 80 additions & 1 deletion pkg/pipeline/builder/audio.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package builder
import (
"fmt"
"sync"
"time"

"github.com/go-gst/go-gst/gst"

Expand All @@ -36,6 +37,10 @@ const (

leakyQueue = true
blockingQueue = false

opusPlcMaxFrames = 5
opusDecStatsPollInterval = time.Second * 5
opusDecPlcMaxJitter = 3 * time.Millisecond
)

type AudioBin struct {
Expand Down Expand Up @@ -201,14 +206,25 @@ func (b *AudioBin) addAudioAppSrcBin(ts *config.TrackSource) error {
return errors.ErrGstPipelineError(err)
}

opusParse, err := gst.NewElement("opusparse")
if err != nil {
return errors.ErrGstPipelineError(err)
}

opusDec, err := gst.NewElement("opusdec")
if err != nil {
return errors.ErrGstPipelineError(err)
}

if err = appSrcBin.AddElements(rtpOpusDepay, opusDec); err != nil {
err = opusDec.SetProperty("plc", true)
if err != nil {
return errors.ErrGstPipelineError(err)
}

if err = appSrcBin.AddElements(rtpOpusDepay, opusParse, opusDec); err != nil {
return err
}
installOpusParseSrcProbe(opusParse, opusDec)

default:
return errors.ErrNotSupported(string(ts.MimeType))
Expand Down Expand Up @@ -397,3 +413,66 @@ func subscribeForQoS(mixer *gst.Element) {
}
})
}

func installOpusParseSrcProbe(opusParse *gst.Element, opusDec *gst.Element) {
src := opusParse.GetStaticPad("src")

var lastPTS, lastDur time.Duration
var lastPoll time.Time

src.AddProbe(gst.PadProbeTypeBuffer, func(p *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
buf := info.GetBuffer()
if buf == nil {
return gst.PadProbeOK
}

pts := time.Duration(buf.PresentationTimestamp())
dur := time.Duration(buf.Duration())

if dur <= 0 {
// Fallback if TOC wasn’t parsed (shouldn’t happen with opusparse)
if lastDur > 0 {
dur = lastDur
} else {
dur = 20 * time.Millisecond
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:nit: maybe make a const for default opus duration?

}
}

if lastDur > 0 {
expected := lastPTS + lastDur
if pts > expected {
gap := pts - expected
// Only trigger for at least ~one full frame gap
if gap+opusDecPlcMaxJitter >= lastDur {
// k missing frames (rounded)
k := int((gap + lastDur - 1) / lastDur)
if k < 1 {
k = 1
}
if k <= opusPlcMaxFrames {
missed := time.Duration(k) * lastDur
// Push GAP so opusdec generates PLC
gapEv := gst.NewGapEvent(gst.ClockTime(expected), gst.ClockTime(missed))
p.PushEvent(gapEv)
buf.SetFlags(buf.GetFlags() | gst.BufferFlagDiscont)
}
}
}
}
lastPTS, lastDur = pts, dur

// periodically gather stats from opusdec
if lastPoll.IsZero() || time.Since(lastPoll) >= opusDecStatsPollInterval {
stats, err := getOpusDecStats(opusDec)
if err != nil {
logger.Debugw("opusdec stats: parse error", "err", err)
return gst.PadProbeOK
}
postOpusDecStatsMessage(opusDec, stats)
lastPoll = time.Now()
}

return gst.PadProbeOK

})
}
129 changes: 129 additions & 0 deletions pkg/pipeline/builder/audio_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package builder

import (
"fmt"
"log"
"regexp"
"strconv"
"time"

"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/gst"
"github.com/livekit/protocol/logger"
)

const (
OpusDecStatsStructName = "livekit-opus-plc-stats"
OpusDecStatsKeyPlcDurationNs = "plc-duration-ns"
OpusDecStatsKeyPlcNumSamples = "plc-num-samples"
OpusDecStatsKeyNumGap = "num-gap"
OpusDecStatsKeyNumPushed = "num-pushed"
)

var (
reU64 = map[string]*regexp.Regexp{
"num-pushed": regexp.MustCompile(`\bnum-pushed=\(g?uint64\)(\d+)`),
"num-gap": regexp.MustCompile(`\bnum-gap=\(g?uint64\)(\d+)`),
"plc-num-samples": regexp.MustCompile(`\bplc-num-samples=\(g?uint64\)(\d+)`),
"plc-duration": regexp.MustCompile(`\bplc-duration=\(g?uint64\)(\d+)`), // ns
}
reU32 = map[string]*regexp.Regexp{
"sample-rate": regexp.MustCompile(`\bsample-rate=\(uint\)(\d+)`),
"channels": regexp.MustCompile(`\bchannels=\(uint\)(\d+)`),
}
)

type OpusDecStats struct {
NumPushed uint64
NumGap uint64
PlcNumSamples uint64
PlcDuration time.Duration // ns
SampleRate uint32
Channels uint32
}

func serializeOpusStats(opusdec *gst.Element) (string, bool) {
gvAny, err := opusdec.GetProperty("stats")
if err != nil {
log.Printf("opusdec stats: get failed: %v", err)
return "", false
}
switch v := gvAny.(type) {
case *gst.Structure:
return v.String(), true
case *glib.Value:
return gst.ValueSerialize(v), true
case string:
return v, true
default:
log.Printf("opusdec stats: unexpected type %T", gvAny)
return "", false
}
}

/*** minimal parser for the serialized GstStructure ***/

func parseStatsString(s string) (OpusDecStats, error) {
var st OpusDecStats

getU64 := func(k string) (uint64, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these useful as some util in a utils package?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting idea - a generic set of functions for finding an integer in a given regex might be interesting to have somewhere available - is there already a utils package you could recommend for this or you maybe referred to creating a utils package in this project?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We put a lot of utils in protocol repo. Would be good to put it there.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to extract it here: livekit/protocol#1195

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

if m := reU64[k].FindStringSubmatch(s); len(m) == 2 {
return strconv.ParseUint(m[1], 10, 64)
}
return 0, fmt.Errorf("missing %s", k)
}
getU32 := func(k string) (uint32, error) {
if m := reU32[k].FindStringSubmatch(s); len(m) == 2 {
v, _ := strconv.ParseUint(m[1], 10, 32)
return uint32(v), nil
}
return 0, fmt.Errorf("missing %s", k)
}

// Required
if v, err := getU64("num-pushed"); err == nil {
st.NumPushed = v
}
if v, err := getU64("num-gap"); err == nil {
st.NumGap = v
}
if v, err := getU64("plc-num-samples"); err == nil {
st.PlcNumSamples = v
}
if v, err := getU64("plc-duration"); err == nil {
st.PlcDuration = time.Duration(v) * time.Nanosecond
}

// Optional
if v, err := getU32("sample-rate"); err == nil {
st.SampleRate = v
}
if v, err := getU32("channels"); err == nil {
st.Channels = v
}
return st, nil
}

func getOpusDecStats(opusdec *gst.Element) (OpusDecStats, error) {
ser, ok := serializeOpusStats(opusdec)
if !ok {
return OpusDecStats{}, fmt.Errorf("serialize error")
}
return parseStatsString(ser)
}

func postOpusDecStatsMessage(src *gst.Element, stats OpusDecStats) {
s := gst.NewStructureFromString(
fmt.Sprintf("%s, %s=(guint64)%d, %s=(guint64)%d, %s=(guint64)%d, %s=(guint64)%d",
OpusDecStatsStructName,
OpusDecStatsKeyPlcDurationNs, stats.PlcDuration.Nanoseconds(),
OpusDecStatsKeyPlcNumSamples, stats.PlcNumSamples,
OpusDecStatsKeyNumGap, stats.NumGap,
OpusDecStatsKeyNumPushed, stats.NumPushed,
))
msg := gst.NewElementMessage(src, s)
sent := src.PostMessage(msg)
if !sent {
logger.Debugw("failed to send opusdec PLC stats")
}
}
9 changes: 9 additions & 0 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ type Controller struct {
type controllerStats struct {
droppedAudioBuffers atomic.Uint64
droppedAudioDuration atomic.Duration
// opusdec stats
opusDecPLCDuration atomic.Duration
opusDecPLCSamples atomic.Uint64
opusDecPacketsPushed atomic.Uint64
opusDecGapPackets atomic.Uint64
}

func New(ctx context.Context, conf *config.PipelineConfig, ipcServiceClient ipc.EgressServiceClient) (*Controller, error) {
Expand Down Expand Up @@ -177,6 +182,10 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo {
logger.Debugw("Audio QoS stats",
"audio buffers dropped", c.stats.droppedAudioBuffers.Load(),
"total audio duration dropped", c.stats.droppedAudioDuration.Load(),
"opusdec PLC duration", c.stats.opusDecPLCDuration.Load(),
"opusdec PLC samples", c.stats.opusDecPLCSamples.Load(),
"opusdec gap packets", c.stats.opusDecGapPackets.Load(),
"opusdec packets pushed", c.stats.opusDecPacketsPushed.Load(),
)
}()

Expand Down
42 changes: 42 additions & 0 deletions pkg/pipeline/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ func (c *Controller) handleMessageElement(msg *gst.Message) error {
if err != nil {
return err
}
case builder.OpusDecStatsStructName:
c.handleOpusDecStats(s)
}
}

Expand All @@ -340,6 +342,46 @@ func (c *Controller) handleAudioMixerQoS(qosValues *gst.QoSValues) {
c.stats.droppedAudioDuration.Add(qosValues.Duration)
}

func (c *Controller) handleOpusDecStats(s *gst.Structure) {
dur, err := s.GetValue(builder.OpusDecStatsKeyPlcDurationNs)
if err != nil {
return
}
plcDurationNs, ok := dur.(uint64)
if !ok {
return
}
numSamples, err := s.GetValue(builder.OpusDecStatsKeyPlcNumSamples)
if err != nil {
return
}
plcNumSamples, ok := numSamples.(uint64)
if !ok {
return
}
numGap, err := s.GetValue(builder.OpusDecStatsKeyNumGap)
if err != nil {
return
}
plcNumGap, ok := numGap.(uint64)
if !ok {
return
}
pushed, err := s.GetValue(builder.OpusDecStatsKeyNumPushed)
if err != nil {
return
}
plcNumPushed, ok := pushed.(uint64)
if !ok {
return
}

c.stats.opusDecPLCDuration.Store(time.Duration(plcDurationNs) * time.Nanosecond)
c.stats.opusDecPLCSamples.Store(plcNumSamples)
c.stats.opusDecGapPackets.Store(plcNumGap)
c.stats.opusDecPacketsPushed.Store(plcNumPushed)
}

// Debug info comes in the following format:
// file.c(line): method_name (): /GstPipeline:pipeline/GstBin:bin_name/GstElement:element_name:\nError message
var gstDebug = regexp.MustCompile("(?s)(.*?)GstPipeline:pipeline/GstBin:(.*?)/(.*?):([^:]*)(:\n)?(.*)")
Expand Down