Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions api/docs/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2435,8 +2435,6 @@ const docTemplate = `{
"domain.EventType": {
"type": "string",
"enum": [
"session.opened",
"session.closed",
"stream.created",
"stream.updated",
"stream.started",
Expand Down Expand Up @@ -2470,7 +2468,9 @@ const docTemplate = `{
"template.updated",
"template.deleted",
"stream.runtime_created",
"stream.runtime_expired"
"stream.runtime_expired",
"session.opened",
"session.closed"
],
"x-enum-comments": {
"EventDVRSegmentPruned": "retention loop deleted an aged-out segment",
Expand All @@ -2487,8 +2487,6 @@ const docTemplate = `{
"EventStreamUpdated": "PUT /streams/{code} on existing record"
},
"x-enum-descriptions": [
"",
"",
"",
"PUT /streams/{code} on existing record",
"",
Expand Down Expand Up @@ -2522,11 +2520,11 @@ const docTemplate = `{
"",
"",
"",
"",
"",
""
],
"x-enum-varnames": [
"EventSessionOpened",
"EventSessionClosed",
"EventStreamCreated",
"EventStreamUpdated",
"EventStreamStarted",
Expand Down Expand Up @@ -2560,7 +2558,9 @@ const docTemplate = `{
"EventTemplateUpdated",
"EventTemplateDeleted",
"EventStreamRuntimeCreated",
"EventStreamRuntimeExpired"
"EventStreamRuntimeExpired",
"EventSessionOpened",
"EventSessionClosed"
]
},
"domain.GlobalConfig": {
Expand Down
16 changes: 8 additions & 8 deletions api/docs/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2428,8 +2428,6 @@
"domain.EventType": {
"type": "string",
"enum": [
"session.opened",
"session.closed",
"stream.created",
"stream.updated",
"stream.started",
Expand Down Expand Up @@ -2463,7 +2461,9 @@
"template.updated",
"template.deleted",
"stream.runtime_created",
"stream.runtime_expired"
"stream.runtime_expired",
"session.opened",
"session.closed"
],
"x-enum-comments": {
"EventDVRSegmentPruned": "retention loop deleted an aged-out segment",
Expand All @@ -2480,8 +2480,6 @@
"EventStreamUpdated": "PUT /streams/{code} on existing record"
},
"x-enum-descriptions": [
"",
"",
"",
"PUT /streams/{code} on existing record",
"",
Expand Down Expand Up @@ -2515,11 +2513,11 @@
"",
"",
"",
"",
"",
""
],
"x-enum-varnames": [
"EventSessionOpened",
"EventSessionClosed",
"EventStreamCreated",
"EventStreamUpdated",
"EventStreamStarted",
Expand Down Expand Up @@ -2553,7 +2551,9 @@
"EventTemplateUpdated",
"EventTemplateDeleted",
"EventStreamRuntimeCreated",
"EventStreamRuntimeExpired"
"EventStreamRuntimeExpired",
"EventSessionOpened",
"EventSessionClosed"
]
},
"domain.GlobalConfig": {
Expand Down
12 changes: 6 additions & 6 deletions api/docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -629,8 +629,6 @@ definitions:
type: object
domain.EventType:
enum:
- session.opened
- session.closed
- stream.created
- stream.updated
- stream.started
Expand Down Expand Up @@ -665,6 +663,8 @@ definitions:
- template.deleted
- stream.runtime_created
- stream.runtime_expired
- session.opened
- session.closed
type: string
x-enum-comments:
EventDVRSegmentPruned: retention loop deleted an aged-out segment
Expand All @@ -682,8 +682,6 @@ definitions:
EventStreamUpdated: PUT /streams/{code} on existing record
x-enum-descriptions:
- ""
- ""
- ""
- PUT /streams/{code} on existing record
- ""
- ""
Expand Down Expand Up @@ -717,9 +715,9 @@ definitions:
- ""
- ""
- ""
- ""
- ""
x-enum-varnames:
- EventSessionOpened
- EventSessionClosed
- EventStreamCreated
- EventStreamUpdated
- EventStreamStarted
Expand Down Expand Up @@ -754,6 +752,8 @@ definitions:
- EventTemplateDeleted
- EventStreamRuntimeCreated
- EventStreamRuntimeExpired
- EventSessionOpened
- EventSessionClosed
domain.GlobalConfig:
properties:
buffer:
Expand Down
2 changes: 2 additions & 0 deletions docs/audit/security-quality-audit.md
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,8 @@ Covered as the HTTP-sink half of **S-2**. `batcher.go:414-446` POSTs to any oper

**Fix:** Un-latch track presence based on arrival liveness rather than init existence: record `lastVideoFrameAt`/`lastAudioFrameAt`; once past `WaitingForPairing`, compute `haveX := initX != nil && (queueLen>0 || now-lastXFrameAt < trackLossTimeout)` with `trackLossTimeout ≈ 6 s` (the existing safety-net deadline). Audio death → `haveAudio=false` → coupling skipped → video cuts resume; video death → `haveVideo=false` → `cutAudioOnly` engages. On a declared-dead track's first resumed frame, re-anchor its next segment tfdt to wallclock so the outage gap isn't baked in as permanent A/V desync.

> ✅ **FIXED** in `fix/dash-track-death` — `tryCut` now derives `haveVideo`/`haveAudio` from a new `liveTrackPresence(now)` instead of the latched `videoInit`/`audioInit` pointers. `handleH264`/`handleAAC` stamp `lastVideoFrameAt`/`lastAudioFrameAt` on every accepted frame; once the stream is `StateLive` (only — `WaitingForPairing`/`SessionBoundary` keep init-presence so the pairing handshake and post-reset stale timestamps aren't disturbed), a track with an empty queue and no arrival within `trackLossTimeout` (6 s, a package var so tests can shrink it) is declared dead. Audio death → coupling skipped (`buildCutDecision` audio block gates on `haveAudio`) → video cuts resume; video death → `cutAudioOnly`. The flag flips back automatically when the track resumes (re-stamp), and the wallclock-anchored tfdt lands the resumed segment at the live edge with no extra re-anchoring needed. Test `TestLiveTrackPresence_TrackDeath` (audio-dead, video-dead, draining-queue, recent-frames, pairing-no-downgrade, single-track cases).

---

### 2.5 Concurrency
Expand Down
57 changes: 55 additions & 2 deletions internal/publisher/dash/packager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ const (
videoPSAccumCap = 4 * 1024
)

// trackLossTimeout is how long a track may be silent (no frame arrival AND
// an empty queue) once the stream is Live before the segmenter treats it as
// dead and stops waiting for it. Without this, the init-presence flags
// (videoInit/audioInit) latch true for the session lifetime, so when one
// track dies mid-session the V/A duration coupling in buildCutDecision holds
// every cut forever (Ok=false) and the DASH live edge freezes. 6 s ≈ 2× the
// default pairing window — long enough to ride out a brief source stall, short
// enough that a genuine track death un-freezes the surviving track quickly.
// A package-level var (not const) so tests can shrink it. See liveTrackPresence.
var trackLossTimeout = 6 * time.Second

// Config carries the per-stream packager configuration.
type Config struct {
// StreamID is used for log fields. Free-form; not used in any file path.
Expand Down Expand Up @@ -145,6 +156,14 @@ type Packager struct {
vSegN uint64
aSegN uint64

// lastVideoFrameAt / lastAudioFrameAt record the wallclock arrival of
// the most recent accepted frame on each track. Once the stream is
// Live, liveTrackPresence uses these (plus queue depth) to declare a
// silent track dead after trackLossTimeout so the surviving track keeps
// cutting instead of freezing on the dead track's missing frames (A-8).
lastVideoFrameAt time.Time
lastAudioFrameAt time.Time

// pairingTruncated latches after truncateAtPairingLocked runs so it
// fires exactly once per session lifetime. Without this latch, a
// stream whose first Cut returns Ok=false (insufficient frames) would
Expand Down Expand Up @@ -382,6 +401,7 @@ func (p *Packager) handleH264(av *domain.AVPacket) {
DTSms: av.DTSms,
IsIDR: av.KeyFrame,
})
p.lastVideoFrameAt = time.Now()
if p.videoInit != nil {
p.state.OpenPairingWindow(time.Now())
}
Expand Down Expand Up @@ -460,6 +480,7 @@ func (p *Packager) handleAAC(av *domain.AVPacket) {
for _, f := range frames {
p.pushAudioWithDiag(f)
}
p.lastAudioFrameAt = time.Now()
if p.audioInit != nil {
p.state.OpenPairingWindow(time.Now())
}
Expand Down Expand Up @@ -606,6 +627,39 @@ func (p *Packager) onSessionBoundary() {
p.state.OnSessionBoundaryHandled()
}

// liveTrackPresence reports which tracks the segmenter should still wait
// for. The base answer is init-presence (a track exists once its init
// segment is built), but a track that has built its init can later die
// mid-session — the upstream stops delivering one elementary stream while
// the other keeps flowing. The init pointer never clears, so without this
// the V/A coupling in buildCutDecision would hold every cut forever and
// freeze the live edge (A-8).
//
// We only down-grade once the stream is Live: during WaitingForPairing the
// init-presence flags drive the pairing handshake (and queues are still
// filling), and during a SessionBoundary the queue was just reset so the
// arrival timestamps are intentionally stale. A track counts as alive while
// it has queued frames OR delivered one within trackLossTimeout; past that,
// with an empty queue, it is declared dead and the surviving track resumes
// cutting (audio death → video-only cuts; video death → cutAudioOnly). The
// flag flips back automatically when the track resumes (handlers re-stamp
// lastXFrameAt), and tfdt stays wallclock-anchored so the resumed segment
// lands at the right live-edge position with no extra re-anchoring.
func (p *Packager) liveTrackPresence(now time.Time) (haveVideo, haveAudio bool) {
haveVideo = p.videoInit != nil
haveAudio = p.audioInit != nil
if p.state.State() != StateLive {
return haveVideo, haveAudio
}
if haveVideo && p.queue.VideoLen() == 0 && now.Sub(p.lastVideoFrameAt) >= trackLossTimeout {
haveVideo = false
}
if haveAudio && p.queue.AudioLen() == 0 && now.Sub(p.lastAudioFrameAt) >= trackLossTimeout {
haveAudio = false
}
return haveVideo, haveAudio
}

// tryCut checks the segmenter at every tick. Holds the mutex throughout
// — segmenter decisions + writes are short and locking the entire path
// avoids state-tearing under concurrent onPacket.
Expand All @@ -617,8 +671,7 @@ func (p *Packager) tryCut(now time.Time) {
return
}

haveVideo := p.videoInit != nil
haveAudio := p.audioInit != nil
haveVideo, haveAudio := p.liveTrackPresence(now)

videoReady := haveVideo && p.queue.VideoLen() > 0
audioReady := haveAudio && p.queue.AudioLen() > 0
Expand Down
114 changes: 114 additions & 0 deletions internal/publisher/dash/packager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,120 @@ func TestPackager_BehindPrevSegEnd(t *testing.T) {
})
}

// liveTrackTestPackager builds a minimal Live-state packager for
// liveTrackPresence tests: both init segments present (so the base
// presence flags are true), an empty queue, and the state machine
// advanced past WaitingForPairing to Live — the only state where
// track-death down-grading applies.
func liveTrackTestPackager() *Packager {
p := &Packager{
state: NewStateMachine(defaultPairingTimeout),
queue: NewFrameQueue(),
videoInit: &VideoInit{},
audioInit: &AudioInit{SampleRate: 48000},
}
p.state.OnFirstSegmentFlushed() // WaitingForPairing → Live
return p
}

// TestLiveTrackPresence_TrackDeath covers A-8: once a stream is Live, a
// track that has gone silent (no frame arrival within trackLossTimeout AND
// an empty queue) is declared dead so the surviving track keeps cutting.
// Before the fix, videoInit/audioInit latched the presence flags true for
// the session lifetime, so a mid-session track death froze the live edge
// (buildCutDecision's V/A coupling held every cut on the missing track).
func TestLiveTrackPresence_TrackDeath(t *testing.T) {
now := time.Date(2026, 1, 1, 3, 0, 0, 0, time.UTC)

t.Run("audio_silent_past_timeout_is_dead_video_survives", func(t *testing.T) {
p := liveTrackTestPackager()
p.lastVideoFrameAt = now.Add(-500 * time.Millisecond) // fresh
p.lastAudioFrameAt = now.Add(-2 * trackLossTimeout) // stale, queue empty
haveV, haveA := p.liveTrackPresence(now)
if !haveV {
t.Error("video should stay alive on a recent frame, got haveVideo=false")
}
if haveA {
t.Error("audio should be declared dead (silent past timeout, empty queue), got haveAudio=true")
}
})

t.Run("video_silent_past_timeout_is_dead_audio_survives", func(t *testing.T) {
p := liveTrackTestPackager()
p.lastVideoFrameAt = now.Add(-2 * trackLossTimeout)
p.lastAudioFrameAt = now.Add(-100 * time.Millisecond)
haveV, haveA := p.liveTrackPresence(now)
if haveV {
t.Error("video should be declared dead (silent past timeout, empty queue), got haveVideo=true")
}
if !haveA {
t.Error("audio should stay alive on a recent frame, got haveAudio=false")
}
})

t.Run("silent_track_with_queued_frames_still_draining_not_dead", func(t *testing.T) {
p := liveTrackTestPackager()
p.lastVideoFrameAt = now.Add(-2 * trackLossTimeout)
p.lastAudioFrameAt = now.Add(-2 * trackLossTimeout)
// Both stale by arrival, but the queues still hold buffered frames
// to drain — the segmenter must keep both until the queue empties.
p.queue.PushVideo(VideoFrame{AnnexB: []byte{0x00, 0x00, 0x00, 0x01}, IsIDR: true})
p.queue.PushAudio(AudioFrame{Raw: []byte{0xAA, 0xBB}})
haveV, haveA := p.liveTrackPresence(now)
if !haveV || !haveA {
t.Errorf("queued frames must keep tracks alive: haveVideo=%v haveAudio=%v", haveV, haveA)
}
})

t.Run("recent_frames_keep_both_tracks_alive", func(t *testing.T) {
p := liveTrackTestPackager()
p.lastVideoFrameAt = now.Add(-trackLossTimeout / 2)
p.lastAudioFrameAt = now.Add(-trackLossTimeout / 2)
haveV, haveA := p.liveTrackPresence(now)
if !haveV || !haveA {
t.Errorf("recent frames must keep tracks alive: haveVideo=%v haveAudio=%v", haveV, haveA)
}
})

t.Run("waiting_for_pairing_uses_init_presence_no_downgrade", func(t *testing.T) {
// Fresh state machine stays in WaitingForPairing. Even with both
// tracks stale + empty queues, the pairing handshake must not be
// disturbed — presence stays init-based.
p := &Packager{
state: NewStateMachine(defaultPairingTimeout),
queue: NewFrameQueue(),
videoInit: &VideoInit{},
audioInit: &AudioInit{SampleRate: 48000},
}
p.lastVideoFrameAt = now.Add(-2 * trackLossTimeout)
p.lastAudioFrameAt = now.Add(-2 * trackLossTimeout)
haveV, haveA := p.liveTrackPresence(now)
if !haveV || !haveA {
t.Errorf("WaitingForPairing must not down-grade tracks: haveVideo=%v haveAudio=%v", haveV, haveA)
}
})

t.Run("genuinely_single_track_stream_unaffected", func(t *testing.T) {
// Video-only stream (no audioInit) must keep haveAudio=false without
// the down-grade path ever mattering, and haveVideo must follow the
// usual liveness rule.
p := &Packager{
state: NewStateMachine(defaultPairingTimeout),
queue: NewFrameQueue(),
videoInit: &VideoInit{},
}
p.state.OnFirstSegmentFlushed()
p.lastVideoFrameAt = now.Add(-100 * time.Millisecond)
haveV, haveA := p.liveTrackPresence(now)
if !haveV {
t.Error("video-only stream with a recent frame should have haveVideo=true")
}
if haveA {
t.Error("video-only stream must have haveAudio=false (no audioInit)")
}
})
}

// ─── helpers ─────────────────────────────────────────────────────────

// waitForFile polls path until it exists or timeout elapses.
Expand Down