From cb86bdd910b2daafdd724c4a456850bd70062574 Mon Sep 17 00:00:00 2001 From: ntthuan060102github Date: Sat, 13 Jun 2026 17:50:33 +0700 Subject: [PATCH] fix(tsnorm): survive parse errors instead of permanently dropping to raw passthrough MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A single astits parse error (one truncated UDP datagram, a corrupt adaptation field) killed the tsnorm demux goroutine: runDemux returned, demuxDone closed, and every later Process returned io.EOF while n.started stayed true — the worker then fell back to un-normalised raw passthrough forever, with no self-heal (raw bytes keep flowing so the stall watchdog never fires). This broke DASH AST/tfdt + HLS timing with a mid-stream PID/timestamp-domain change (A-4). - runDemux rebuilds the demuxer on a parse error and resyncs on the next sync byte, capped at maxDemuxRestarts=8 (mirrors pull.TSDemuxPacketReader). - Process resets n.started when it observes demuxDone, so a goroutine that does give up is relaunched lazily on the next call. Regression test TestProcess_RecoversFromParseError. Fixes audit finding A-4. --- docs/audit/security-quality-audit.md | 2 ++ internal/ingestor/tsnorm/tsnorm.go | 33 +++++++++++++++++---- internal/ingestor/tsnorm/tsnorm_test.go | 39 +++++++++++++++++++++++++ 3 files changed, 68 insertions(+), 6 deletions(-) diff --git a/docs/audit/security-quality-audit.md b/docs/audit/security-quality-audit.md index 4f9afbd5..beaa3a2f 100644 --- a/docs/audit/security-quality-audit.md +++ b/docs/audit/security-quality-audit.md @@ -391,6 +391,8 @@ Covered as the HTTP-sink half of **S-2**. `batcher.go:414-446` POSTs to any oper --- #### A-4 (HIGH) — tsnorm demux goroutine death permanently disables TS normalisation, silently switches to raw passthrough +> ✅ **FIXED** in `fix/tsnorm-restart` — `runDemux` now rebuilds the astits demuxer on a parse error and resyncs (capped at `maxDemuxRestarts=8`, mirroring `pull.TSDemuxPacketReader`) instead of dying; and `Process` resets `n.started` when it observes `demuxDone`, so a goroutine that does give up is relaunched lazily on the next call. A transient corruption now costs at most one passthrough chunk, not permanent un-normalised output. Test `TestProcess_RecoversFromParseError`. + **Files:** `internal/ingestor/tsnorm/tsnorm.go:391-417` (`runDemux` exits on any non-EOF `NextData` error; `n.started` never reset), `:262-290` (`Process` returns io.EOF forever); `internal/ingestor/worker.go:553-574` (`writeRawTSChunk` falls back to raw passthrough per chunk). **Trigger:** Any astits parse error on a raw-TS source — e.g. one truncated UDP datagram (UDP default read buffer is exactly 1316 bytes / 188×7; larger datagrams truncate → permanent misalignment) or a corrupted adaptation field on a noisy multicast feed. `runDemux` logs at Debug and returns; every later `Process()` returns io.EOF; `writeRawTSChunk` then writes the **raw upstream chunk** for every chunk, forever. diff --git a/internal/ingestor/tsnorm/tsnorm.go b/internal/ingestor/tsnorm/tsnorm.go index 7399c738..acde215b 100644 --- a/internal/ingestor/tsnorm/tsnorm.go +++ b/internal/ingestor/tsnorm/tsnorm.go @@ -114,6 +114,12 @@ const muxerTablesRetransmitPeriod = 40 // its prefetch behaviour into the ack-timing logic). const tsPacketSize = 188 +// maxDemuxRestarts caps how many times runDemux rebuilds the astits demuxer +// after a parse error before giving up. Mirrors pull.TSDemuxPacketReader so a +// transient corruption (one truncated datagram, a bad adaptation field) resyncs +// instead of permanently killing normalisation. +const maxDemuxRestarts = 8 + // Normaliser wallclock-anchors the per-PES PTS / DTS values inside one // stream's MPEG-TS byte feed. NOT safe for concurrent Process calls — // the internal mutex serialises them; callers running in a single @@ -273,6 +279,7 @@ func (n *Normaliser) Process(chunk []byte) ([]byte, error) { select { case n.chunks <- chunk: case <-n.demuxDone: + n.started = false // demux goroutine died — relaunch lazily on the next Process (A-4) return nil, io.EOF } select { @@ -281,6 +288,7 @@ func (n *Normaliser) Process(chunk []byte) ([]byte, error) { // Drain whatever the goroutine did manage to emit before // dying — gives the worker a chance to surface a partial // segment instead of swallowing it on shutdown. + n.started = false // demux goroutine died — relaunch lazily on the next Process (A-4) if n.outBuf.Len() == 0 { return nil, io.EOF } @@ -388,19 +396,32 @@ func (n *Normaliser) runDemux() { astits.DemuxerOptPacketSize(tsPacketSize), ) + consecutiveErrors := 0 for { data, err := dmx.NextData() if err != nil { if errors.Is(err, astits.ErrNoMorePackets) || errors.Is(err, io.EOF) { return } - // Parse errors are recoverable in principle (the next PSI - // re-syncs the demuxer) but astits returns them as fatal. - // We log and bail — the caller's session-boundary path - // rebuilds us on the next OnSession. - slog.Debug("tsnorm: demuxer exit on error", "err", err) - return + // astits surfaces a parse error as fatal, but it's recoverable: + // rebuild the demuxer on the SAME reader so it resyncs on the next + // sync byte (mirrors pull.TSDemuxPacketReader). Before this, a single + // truncated UDP datagram or corrupt adaptation field killed this + // goroutine for good — every later Process returned io.EOF and the + // worker silently fell back to un-normalised raw passthrough with no + // self-heal (A-4). + consecutiveErrors++ + if consecutiveErrors > maxDemuxRestarts { + slog.Warn("tsnorm: demuxer giving up after repeated parse errors", + "err", err, "restarts", consecutiveErrors) + return + } + slog.Debug("tsnorm: demuxer parse error — rebuilding to resync", + "err", err, "restart", consecutiveErrors) + dmx = astits.NewDemuxer(n.ctx, rdr, astits.DemuxerOptPacketSize(tsPacketSize)) + continue } + consecutiveErrors = 0 switch { case data.PMT != nil: for _, es := range data.PMT.ElementaryStreams { diff --git a/internal/ingestor/tsnorm/tsnorm_test.go b/internal/ingestor/tsnorm/tsnorm_test.go index c384193a..784d7d04 100644 --- a/internal/ingestor/tsnorm/tsnorm_test.go +++ b/internal/ingestor/tsnorm/tsnorm_test.go @@ -3,6 +3,8 @@ package tsnorm_test import ( "bytes" "context" + "errors" + "io" "testing" gompeg2 "github.com/yapingcat/gomedia/go-mpeg2" @@ -687,3 +689,40 @@ func TestProcess_DiscontinuityIndicatorReseedsOnSession(t *testing.T) { t.Fatal("post-OnSession pulse must carry discontinuity_indicator=1 on video PID") } } + +// A-4: a parse error must not permanently kill normalisation. After a corrupt +// chunk the demuxer rebuilds and resyncs, so a later valid chunk is still +// normalised — instead of every later Process returning io.EOF and the worker +// silently falling back to un-normalised raw passthrough forever. +func TestProcess_RecoversFromParseError(t *testing.T) { + mkValid := func() []byte { + m := tsmux.NewFromAV(context.Background()) + var b bytes.Buffer + m.Write(&domain.AVPacket{ + Codec: domain.AVCodecH264, Data: buildH264IDR(), + PTSms: 1000, DTSms: 1000, KeyFrame: true, + }, func(p []byte) { b.Write(p) }) + return b.Bytes() + } + n := tsnorm.New(context.Background(), timeline.DefaultConfig()) + + if _, err := n.Process(mkValid()); err != nil { + t.Fatalf("Process(valid #1): %v", err) + } + // Corrupt 188-byte packet (no 0x47 sync) → astits parse error path. + if _, err := n.Process(make([]byte, 188)); err != nil && !errors.Is(err, io.EOF) { + t.Fatalf("Process(corrupt) unexpected err: %v", err) + } + // A subsequent valid chunk must still normalise — not a permanent io.EOF. + out, err := n.Process(mkValid()) + if errors.Is(err, io.EOF) { + t.Fatal("tsnorm returned io.EOF after a parse error — normalisation died permanently (A-4)") + } + if err != nil { + t.Fatalf("Process(valid #2): %v", err) + } + out = append(out, n.Flush()...) + if len(out) == 0 { + t.Fatal("tsnorm produced no output after recovering from a parse error") + } +}