Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/audit/security-quality-audit.md
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ Covered as the HTTP-sink half of **S-2**. `batcher.go:414-446` POSTs to any oper
---

#### A-4 (HIGH) — tsnorm demux goroutine death permanently disables TS normalisation, silently switches to raw passthrough
> ✅ **FIXED** in `fix/tsnorm-restart` — `runDemux` now rebuilds the astits demuxer on a parse error and resyncs (capped at `maxDemuxRestarts=8`, mirroring `pull.TSDemuxPacketReader`) instead of dying; and `Process` resets `n.started` when it observes `demuxDone`, so a goroutine that does give up is relaunched lazily on the next call. A transient corruption now costs at most one passthrough chunk, not permanent un-normalised output. Test `TestProcess_RecoversFromParseError`.

**Files:** `internal/ingestor/tsnorm/tsnorm.go:391-417` (`runDemux` exits on any non-EOF `NextData` error; `n.started` never reset), `:262-290` (`Process` returns io.EOF forever); `internal/ingestor/worker.go:553-574` (`writeRawTSChunk` falls back to raw passthrough per chunk).

**Trigger:** Any astits parse error on a raw-TS source — e.g. one truncated UDP datagram (UDP default read buffer is exactly 1316 bytes / 188×7; larger datagrams truncate → permanent misalignment) or a corrupted adaptation field on a noisy multicast feed. `runDemux` logs at Debug and returns; every later `Process()` returns io.EOF; `writeRawTSChunk` then writes the **raw upstream chunk** for every chunk, forever.
Expand Down
33 changes: 27 additions & 6 deletions internal/ingestor/tsnorm/tsnorm.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ const muxerTablesRetransmitPeriod = 40
// its prefetch behaviour into the ack-timing logic).
const tsPacketSize = 188

// maxDemuxRestarts caps how many times runDemux rebuilds the astits demuxer
// after a parse error before giving up. Mirrors pull.TSDemuxPacketReader so a
// transient corruption (one truncated datagram, a bad adaptation field) resyncs
// instead of permanently killing normalisation.
const maxDemuxRestarts = 8

// Normaliser wallclock-anchors the per-PES PTS / DTS values inside one
// stream's MPEG-TS byte feed. NOT safe for concurrent Process calls —
// the internal mutex serialises them; callers running in a single
Expand Down Expand Up @@ -273,6 +279,7 @@ func (n *Normaliser) Process(chunk []byte) ([]byte, error) {
select {
case n.chunks <- chunk:
case <-n.demuxDone:
n.started = false // demux goroutine died — relaunch lazily on the next Process (A-4)
return nil, io.EOF
}
select {
Expand All @@ -281,6 +288,7 @@ func (n *Normaliser) Process(chunk []byte) ([]byte, error) {
// Drain whatever the goroutine did manage to emit before
// dying — gives the worker a chance to surface a partial
// segment instead of swallowing it on shutdown.
n.started = false // demux goroutine died — relaunch lazily on the next Process (A-4)
if n.outBuf.Len() == 0 {
return nil, io.EOF
}
Expand Down Expand Up @@ -388,19 +396,32 @@ func (n *Normaliser) runDemux() {
astits.DemuxerOptPacketSize(tsPacketSize),
)

consecutiveErrors := 0
for {
data, err := dmx.NextData()
if err != nil {
if errors.Is(err, astits.ErrNoMorePackets) || errors.Is(err, io.EOF) {
return
}
// Parse errors are recoverable in principle (the next PSI
// re-syncs the demuxer) but astits returns them as fatal.
// We log and bail — the caller's session-boundary path
// rebuilds us on the next OnSession.
slog.Debug("tsnorm: demuxer exit on error", "err", err)
return
// astits surfaces a parse error as fatal, but it's recoverable:
// rebuild the demuxer on the SAME reader so it resyncs on the next
// sync byte (mirrors pull.TSDemuxPacketReader). Before this, a single
// truncated UDP datagram or corrupt adaptation field killed this
// goroutine for good — every later Process returned io.EOF and the
// worker silently fell back to un-normalised raw passthrough with no
// self-heal (A-4).
consecutiveErrors++
if consecutiveErrors > maxDemuxRestarts {
slog.Warn("tsnorm: demuxer giving up after repeated parse errors",
"err", err, "restarts", consecutiveErrors)
return
}
slog.Debug("tsnorm: demuxer parse error — rebuilding to resync",
"err", err, "restart", consecutiveErrors)
dmx = astits.NewDemuxer(n.ctx, rdr, astits.DemuxerOptPacketSize(tsPacketSize))
continue
}
consecutiveErrors = 0
switch {
case data.PMT != nil:
for _, es := range data.PMT.ElementaryStreams {
Expand Down
39 changes: 39 additions & 0 deletions internal/ingestor/tsnorm/tsnorm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package tsnorm_test
import (
"bytes"
"context"
"errors"
"io"
"testing"

gompeg2 "github.com/yapingcat/gomedia/go-mpeg2"
Expand Down Expand Up @@ -687,3 +689,40 @@ func TestProcess_DiscontinuityIndicatorReseedsOnSession(t *testing.T) {
t.Fatal("post-OnSession pulse must carry discontinuity_indicator=1 on video PID")
}
}

// A-4: a parse error must not permanently kill normalisation. After a corrupt
// chunk the demuxer rebuilds and resyncs, so a later valid chunk is still
// normalised — instead of every later Process returning io.EOF and the worker
// silently falling back to un-normalised raw passthrough forever.
func TestProcess_RecoversFromParseError(t *testing.T) {
mkValid := func() []byte {
m := tsmux.NewFromAV(context.Background())
var b bytes.Buffer
m.Write(&domain.AVPacket{
Codec: domain.AVCodecH264, Data: buildH264IDR(),
PTSms: 1000, DTSms: 1000, KeyFrame: true,
}, func(p []byte) { b.Write(p) })
return b.Bytes()
}
n := tsnorm.New(context.Background(), timeline.DefaultConfig())

if _, err := n.Process(mkValid()); err != nil {
t.Fatalf("Process(valid #1): %v", err)
}
// Corrupt 188-byte packet (no 0x47 sync) → astits parse error path.
if _, err := n.Process(make([]byte, 188)); err != nil && !errors.Is(err, io.EOF) {
t.Fatalf("Process(corrupt) unexpected err: %v", err)
}
// A subsequent valid chunk must still normalise — not a permanent io.EOF.
out, err := n.Process(mkValid())
if errors.Is(err, io.EOF) {
t.Fatal("tsnorm returned io.EOF after a parse error — normalisation died permanently (A-4)")
}
if err != nil {
t.Fatalf("Process(valid #2): %v", err)
}
out = append(out, n.Flush()...)
if len(out) == 0 {
t.Fatal("tsnorm produced no output after recovering from a parse error")
}
}