From 96570d9d6402b67b28f9bdfa2d2dddc0d3ff9fd6 Mon Sep 17 00:00:00 2001 From: ntthuan060102github Date: Sat, 13 Jun 2026 18:05:12 +0700 Subject: [PATCH] fix(dash): un-latch track presence so the live edge survives a mid-session track death MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The DASH packager derived haveVideo/haveAudio from videoInit/audioInit, which are built once and never cleared. When one elementary stream stopped mid-session (encoder fault, or a failover swap to a single-track feed while the publisher keeps running by design) the presence flag stayed true, so buildCutDecision's V/A duration coupling held every cut waiting for the dead track's frames — Ok=false on every tick. The live edge froze permanently for all DASH viewers, the frame queue saturated and overflow-dropped, and only a manual restart recovered it. HLS on the same stream kept working, masking it. Derive presence from arrival liveness instead: handleH264/handleAAC stamp the last-frame-arrival wallclock per track, and a new liveTrackPresence(now) declares a track dead once the stream is Live and that track has both an empty queue and no arrival within trackLossTimeout (6 s). Down-grading is gated to StateLive only, so the pairing handshake and post-session-boundary stale timestamps are untouched. Audio death then skips the coupling and video cuts resume; video death routes to cutAudioOnly. Presence flips back automatically when the track resumes, and the wallclock-anchored tfdt lands the resumed segment at the live edge with no extra re-anchoring. Test TestLiveTrackPresence_TrackDeath covers audio-dead, video-dead, still-draining-queue, recent-frames, waiting-for-pairing (no down-grade), and genuine single-track cases. --- api/docs/docs.go | 16 ++-- api/docs/swagger.json | 16 ++-- api/docs/swagger.yaml | 12 +-- docs/audit/security-quality-audit.md | 2 + internal/publisher/dash/packager.go | 57 +++++++++++- internal/publisher/dash/packager_test.go | 114 +++++++++++++++++++++++ 6 files changed, 193 insertions(+), 24 deletions(-) diff --git a/api/docs/docs.go b/api/docs/docs.go index 907d4cd5..324acdbd 100644 --- a/api/docs/docs.go +++ b/api/docs/docs.go @@ -2435,8 +2435,6 @@ const docTemplate = `{ "domain.EventType": { "type": "string", "enum": [ - "session.opened", - "session.closed", "stream.created", "stream.updated", "stream.started", @@ -2470,7 +2468,9 @@ const docTemplate = `{ "template.updated", "template.deleted", "stream.runtime_created", - "stream.runtime_expired" + "stream.runtime_expired", + "session.opened", + "session.closed" ], "x-enum-comments": { "EventDVRSegmentPruned": "retention loop deleted an aged-out segment", @@ -2487,8 +2487,6 @@ const docTemplate = `{ "EventStreamUpdated": "PUT /streams/{code} on existing record" }, "x-enum-descriptions": [ - "", - "", "", "PUT /streams/{code} on existing record", "", @@ -2522,11 +2520,11 @@ const docTemplate = `{ "", "", "", + "", + "", "" ], "x-enum-varnames": [ - "EventSessionOpened", - "EventSessionClosed", "EventStreamCreated", "EventStreamUpdated", "EventStreamStarted", @@ -2560,7 +2558,9 @@ const docTemplate = `{ "EventTemplateUpdated", "EventTemplateDeleted", "EventStreamRuntimeCreated", - "EventStreamRuntimeExpired" + "EventStreamRuntimeExpired", + "EventSessionOpened", + "EventSessionClosed" ] }, "domain.GlobalConfig": { diff --git a/api/docs/swagger.json b/api/docs/swagger.json index 448f8406..26e027e9 100644 --- a/api/docs/swagger.json +++ b/api/docs/swagger.json @@ -2428,8 +2428,6 @@ "domain.EventType": { "type": "string", "enum": [ - "session.opened", - "session.closed", "stream.created", "stream.updated", "stream.started", @@ -2463,7 +2461,9 @@ "template.updated", "template.deleted", "stream.runtime_created", - "stream.runtime_expired" + "stream.runtime_expired", + "session.opened", + "session.closed" ], "x-enum-comments": { "EventDVRSegmentPruned": "retention loop deleted an aged-out segment", @@ -2480,8 +2480,6 @@ "EventStreamUpdated": "PUT /streams/{code} on existing record" }, "x-enum-descriptions": [ - "", - "", "", "PUT /streams/{code} on existing record", "", @@ -2515,11 +2513,11 @@ "", "", "", + "", + "", "" ], "x-enum-varnames": [ - "EventSessionOpened", - "EventSessionClosed", "EventStreamCreated", "EventStreamUpdated", "EventStreamStarted", @@ -2553,7 +2551,9 @@ "EventTemplateUpdated", "EventTemplateDeleted", "EventStreamRuntimeCreated", - "EventStreamRuntimeExpired" + "EventStreamRuntimeExpired", + "EventSessionOpened", + "EventSessionClosed" ] }, "domain.GlobalConfig": { diff --git a/api/docs/swagger.yaml b/api/docs/swagger.yaml index 26e2c092..ffd463a2 100644 --- a/api/docs/swagger.yaml +++ b/api/docs/swagger.yaml @@ -629,8 +629,6 @@ definitions: type: object domain.EventType: enum: - - session.opened - - session.closed - stream.created - stream.updated - stream.started @@ -665,6 +663,8 @@ definitions: - template.deleted - stream.runtime_created - stream.runtime_expired + - session.opened + - session.closed type: string x-enum-comments: EventDVRSegmentPruned: retention loop deleted an aged-out segment @@ -682,8 +682,6 @@ definitions: EventStreamUpdated: PUT /streams/{code} on existing record x-enum-descriptions: - "" - - "" - - "" - PUT /streams/{code} on existing record - "" - "" @@ -717,9 +715,9 @@ definitions: - "" - "" - "" + - "" + - "" x-enum-varnames: - - EventSessionOpened - - EventSessionClosed - EventStreamCreated - EventStreamUpdated - EventStreamStarted @@ -754,6 +752,8 @@ definitions: - EventTemplateDeleted - EventStreamRuntimeCreated - EventStreamRuntimeExpired + - EventSessionOpened + - EventSessionClosed domain.GlobalConfig: properties: buffer: diff --git a/docs/audit/security-quality-audit.md b/docs/audit/security-quality-audit.md index 4f9afbd5..a350ed8d 100644 --- a/docs/audit/security-quality-audit.md +++ b/docs/audit/security-quality-audit.md @@ -447,6 +447,8 @@ Covered as the HTTP-sink half of **S-2**. `batcher.go:414-446` POSTs to any oper **Fix:** Un-latch track presence based on arrival liveness rather than init existence: record `lastVideoFrameAt`/`lastAudioFrameAt`; once past `WaitingForPairing`, compute `haveX := initX != nil && (queueLen>0 || now-lastXFrameAt < trackLossTimeout)` with `trackLossTimeout ≈ 6 s` (the existing safety-net deadline). Audio death → `haveAudio=false` → coupling skipped → video cuts resume; video death → `haveVideo=false` → `cutAudioOnly` engages. On a declared-dead track's first resumed frame, re-anchor its next segment tfdt to wallclock so the outage gap isn't baked in as permanent A/V desync. +> ✅ **FIXED** in `fix/dash-track-death` — `tryCut` now derives `haveVideo`/`haveAudio` from a new `liveTrackPresence(now)` instead of the latched `videoInit`/`audioInit` pointers. `handleH264`/`handleAAC` stamp `lastVideoFrameAt`/`lastAudioFrameAt` on every accepted frame; once the stream is `StateLive` (only — `WaitingForPairing`/`SessionBoundary` keep init-presence so the pairing handshake and post-reset stale timestamps aren't disturbed), a track with an empty queue and no arrival within `trackLossTimeout` (6 s, a package var so tests can shrink it) is declared dead. Audio death → coupling skipped (`buildCutDecision` audio block gates on `haveAudio`) → video cuts resume; video death → `cutAudioOnly`. The flag flips back automatically when the track resumes (re-stamp), and the wallclock-anchored tfdt lands the resumed segment at the live edge with no extra re-anchoring needed. Test `TestLiveTrackPresence_TrackDeath` (audio-dead, video-dead, draining-queue, recent-frames, pairing-no-downgrade, single-track cases). + --- ### 2.5 Concurrency diff --git a/internal/publisher/dash/packager.go b/internal/publisher/dash/packager.go index 58b0de91..f89f1d48 100644 --- a/internal/publisher/dash/packager.go +++ b/internal/publisher/dash/packager.go @@ -60,6 +60,17 @@ const ( videoPSAccumCap = 4 * 1024 ) +// trackLossTimeout is how long a track may be silent (no frame arrival AND +// an empty queue) once the stream is Live before the segmenter treats it as +// dead and stops waiting for it. Without this, the init-presence flags +// (videoInit/audioInit) latch true for the session lifetime, so when one +// track dies mid-session the V/A duration coupling in buildCutDecision holds +// every cut forever (Ok=false) and the DASH live edge freezes. 6 s ≈ 2× the +// default pairing window — long enough to ride out a brief source stall, short +// enough that a genuine track death un-freezes the surviving track quickly. +// A package-level var (not const) so tests can shrink it. See liveTrackPresence. +var trackLossTimeout = 6 * time.Second + // Config carries the per-stream packager configuration. type Config struct { // StreamID is used for log fields. Free-form; not used in any file path. @@ -145,6 +156,14 @@ type Packager struct { vSegN uint64 aSegN uint64 + // lastVideoFrameAt / lastAudioFrameAt record the wallclock arrival of + // the most recent accepted frame on each track. Once the stream is + // Live, liveTrackPresence uses these (plus queue depth) to declare a + // silent track dead after trackLossTimeout so the surviving track keeps + // cutting instead of freezing on the dead track's missing frames (A-8). + lastVideoFrameAt time.Time + lastAudioFrameAt time.Time + // pairingTruncated latches after truncateAtPairingLocked runs so it // fires exactly once per session lifetime. Without this latch, a // stream whose first Cut returns Ok=false (insufficient frames) would @@ -382,6 +401,7 @@ func (p *Packager) handleH264(av *domain.AVPacket) { DTSms: av.DTSms, IsIDR: av.KeyFrame, }) + p.lastVideoFrameAt = time.Now() if p.videoInit != nil { p.state.OpenPairingWindow(time.Now()) } @@ -460,6 +480,7 @@ func (p *Packager) handleAAC(av *domain.AVPacket) { for _, f := range frames { p.pushAudioWithDiag(f) } + p.lastAudioFrameAt = time.Now() if p.audioInit != nil { p.state.OpenPairingWindow(time.Now()) } @@ -606,6 +627,39 @@ func (p *Packager) onSessionBoundary() { p.state.OnSessionBoundaryHandled() } +// liveTrackPresence reports which tracks the segmenter should still wait +// for. The base answer is init-presence (a track exists once its init +// segment is built), but a track that has built its init can later die +// mid-session — the upstream stops delivering one elementary stream while +// the other keeps flowing. The init pointer never clears, so without this +// the V/A coupling in buildCutDecision would hold every cut forever and +// freeze the live edge (A-8). +// +// We only down-grade once the stream is Live: during WaitingForPairing the +// init-presence flags drive the pairing handshake (and queues are still +// filling), and during a SessionBoundary the queue was just reset so the +// arrival timestamps are intentionally stale. A track counts as alive while +// it has queued frames OR delivered one within trackLossTimeout; past that, +// with an empty queue, it is declared dead and the surviving track resumes +// cutting (audio death → video-only cuts; video death → cutAudioOnly). The +// flag flips back automatically when the track resumes (handlers re-stamp +// lastXFrameAt), and tfdt stays wallclock-anchored so the resumed segment +// lands at the right live-edge position with no extra re-anchoring. +func (p *Packager) liveTrackPresence(now time.Time) (haveVideo, haveAudio bool) { + haveVideo = p.videoInit != nil + haveAudio = p.audioInit != nil + if p.state.State() != StateLive { + return haveVideo, haveAudio + } + if haveVideo && p.queue.VideoLen() == 0 && now.Sub(p.lastVideoFrameAt) >= trackLossTimeout { + haveVideo = false + } + if haveAudio && p.queue.AudioLen() == 0 && now.Sub(p.lastAudioFrameAt) >= trackLossTimeout { + haveAudio = false + } + return haveVideo, haveAudio +} + // tryCut checks the segmenter at every tick. Holds the mutex throughout // — segmenter decisions + writes are short and locking the entire path // avoids state-tearing under concurrent onPacket. @@ -617,8 +671,7 @@ func (p *Packager) tryCut(now time.Time) { return } - haveVideo := p.videoInit != nil - haveAudio := p.audioInit != nil + haveVideo, haveAudio := p.liveTrackPresence(now) videoReady := haveVideo && p.queue.VideoLen() > 0 audioReady := haveAudio && p.queue.AudioLen() > 0 diff --git a/internal/publisher/dash/packager_test.go b/internal/publisher/dash/packager_test.go index 3a98752d..c7bf19ce 100644 --- a/internal/publisher/dash/packager_test.go +++ b/internal/publisher/dash/packager_test.go @@ -832,6 +832,120 @@ func TestPackager_BehindPrevSegEnd(t *testing.T) { }) } +// liveTrackTestPackager builds a minimal Live-state packager for +// liveTrackPresence tests: both init segments present (so the base +// presence flags are true), an empty queue, and the state machine +// advanced past WaitingForPairing to Live — the only state where +// track-death down-grading applies. +func liveTrackTestPackager() *Packager { + p := &Packager{ + state: NewStateMachine(defaultPairingTimeout), + queue: NewFrameQueue(), + videoInit: &VideoInit{}, + audioInit: &AudioInit{SampleRate: 48000}, + } + p.state.OnFirstSegmentFlushed() // WaitingForPairing → Live + return p +} + +// TestLiveTrackPresence_TrackDeath covers A-8: once a stream is Live, a +// track that has gone silent (no frame arrival within trackLossTimeout AND +// an empty queue) is declared dead so the surviving track keeps cutting. +// Before the fix, videoInit/audioInit latched the presence flags true for +// the session lifetime, so a mid-session track death froze the live edge +// (buildCutDecision's V/A coupling held every cut on the missing track). +func TestLiveTrackPresence_TrackDeath(t *testing.T) { + now := time.Date(2026, 1, 1, 3, 0, 0, 0, time.UTC) + + t.Run("audio_silent_past_timeout_is_dead_video_survives", func(t *testing.T) { + p := liveTrackTestPackager() + p.lastVideoFrameAt = now.Add(-500 * time.Millisecond) // fresh + p.lastAudioFrameAt = now.Add(-2 * trackLossTimeout) // stale, queue empty + haveV, haveA := p.liveTrackPresence(now) + if !haveV { + t.Error("video should stay alive on a recent frame, got haveVideo=false") + } + if haveA { + t.Error("audio should be declared dead (silent past timeout, empty queue), got haveAudio=true") + } + }) + + t.Run("video_silent_past_timeout_is_dead_audio_survives", func(t *testing.T) { + p := liveTrackTestPackager() + p.lastVideoFrameAt = now.Add(-2 * trackLossTimeout) + p.lastAudioFrameAt = now.Add(-100 * time.Millisecond) + haveV, haveA := p.liveTrackPresence(now) + if haveV { + t.Error("video should be declared dead (silent past timeout, empty queue), got haveVideo=true") + } + if !haveA { + t.Error("audio should stay alive on a recent frame, got haveAudio=false") + } + }) + + t.Run("silent_track_with_queued_frames_still_draining_not_dead", func(t *testing.T) { + p := liveTrackTestPackager() + p.lastVideoFrameAt = now.Add(-2 * trackLossTimeout) + p.lastAudioFrameAt = now.Add(-2 * trackLossTimeout) + // Both stale by arrival, but the queues still hold buffered frames + // to drain — the segmenter must keep both until the queue empties. + p.queue.PushVideo(VideoFrame{AnnexB: []byte{0x00, 0x00, 0x00, 0x01}, IsIDR: true}) + p.queue.PushAudio(AudioFrame{Raw: []byte{0xAA, 0xBB}}) + haveV, haveA := p.liveTrackPresence(now) + if !haveV || !haveA { + t.Errorf("queued frames must keep tracks alive: haveVideo=%v haveAudio=%v", haveV, haveA) + } + }) + + t.Run("recent_frames_keep_both_tracks_alive", func(t *testing.T) { + p := liveTrackTestPackager() + p.lastVideoFrameAt = now.Add(-trackLossTimeout / 2) + p.lastAudioFrameAt = now.Add(-trackLossTimeout / 2) + haveV, haveA := p.liveTrackPresence(now) + if !haveV || !haveA { + t.Errorf("recent frames must keep tracks alive: haveVideo=%v haveAudio=%v", haveV, haveA) + } + }) + + t.Run("waiting_for_pairing_uses_init_presence_no_downgrade", func(t *testing.T) { + // Fresh state machine stays in WaitingForPairing. Even with both + // tracks stale + empty queues, the pairing handshake must not be + // disturbed — presence stays init-based. + p := &Packager{ + state: NewStateMachine(defaultPairingTimeout), + queue: NewFrameQueue(), + videoInit: &VideoInit{}, + audioInit: &AudioInit{SampleRate: 48000}, + } + p.lastVideoFrameAt = now.Add(-2 * trackLossTimeout) + p.lastAudioFrameAt = now.Add(-2 * trackLossTimeout) + haveV, haveA := p.liveTrackPresence(now) + if !haveV || !haveA { + t.Errorf("WaitingForPairing must not down-grade tracks: haveVideo=%v haveAudio=%v", haveV, haveA) + } + }) + + t.Run("genuinely_single_track_stream_unaffected", func(t *testing.T) { + // Video-only stream (no audioInit) must keep haveAudio=false without + // the down-grade path ever mattering, and haveVideo must follow the + // usual liveness rule. + p := &Packager{ + state: NewStateMachine(defaultPairingTimeout), + queue: NewFrameQueue(), + videoInit: &VideoInit{}, + } + p.state.OnFirstSegmentFlushed() + p.lastVideoFrameAt = now.Add(-100 * time.Millisecond) + haveV, haveA := p.liveTrackPresence(now) + if !haveV { + t.Error("video-only stream with a recent frame should have haveVideo=true") + } + if haveA { + t.Error("video-only stream must have haveAudio=false (no audioInit)") + } + }) +} + // ─── helpers ───────────────────────────────────────────────────────── // waitForFile polls path until it exists or timeout elapses.