Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/audit/security-quality-audit.md
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ Covered as the HTTP-sink half of **S-2**. `batcher.go:414-446` POSTs to any oper
---

#### D-1 (HIGH) — DVR catalog wiped on every recording restart → orphaned blobs, retention defeated, unbounded disk growth
> ✅ **FIXED** in `fix/dvr-data-loss` — `StartRecording` now `LoadCatalog`s the prior catalog and `mergePriorCoverage` carries its `Hours`/`Available`/`Gaps` (and profiles only in the old catalog) into the fresh one, so `pruneOnce` sees and prunes pre-restart hours (retention is wall/size-anchored → correct regardless of origin). The new run anchors its own media origin so recent timeshift keeps working; playing back **across** a restart boundary needs a per-hour reader anchor (noted follow-up). Test `TestMergePriorCoverage`.

**Files:** `internal/dvr/blob/service.go:96/100` (`newCatalog` + `Save`, `:251-274`); `LoadCatalog` only at `reader.go:48`; retention `retention.go:48-93`; `recovery.go:50-70` (`RepairStream` only seals `.open`-sentinel crash-dirty hours); trigger `coordinator.go:360/745-748`.
**Merges:** two reported findings (identical root cause).

Expand All @@ -339,6 +341,8 @@ Covered as the HTTP-sink half of **S-2**. `batcher.go:414-446` POSTs to any oper
---

#### D-2 (HIGH) — Raw-TS / passthrough streams record ZERO bytes while `recording_status` reports "recording"
> ✅ **FIXED (fail-loud)** in `fix/dvr-data-loss` — `blobProfiles` now refuses a DVR lane for a non-transcoded raw-TS source (`StreamMainBufferIsTS`), so no catalog or Recording row is created and the status no longer lies. Making raw-TS DVR actually record (wire `pkt.TS` demux → AVPacket → `ingestAV`) is a deliberate follow-up. Test `TestBlobProfiles_RefusesRawTSSource`.

**Files:** `internal/dvr/blob/writer.go:102-120` (`Ingest` handles only `pkt.AV`; `pkt.TS` is an explicit unimplemented stub at `:117-118`); `service.go:100-113` (catalog + Recording row saved unconditionally); `coordinator.go:780-784` (`blobProfiles` single `p0` lane on `stream.Code`); status `blob_timeshift.go:45-48`.

**Trigger:** Enable DVR on any non-transcoded stream whose ingest writes raw MPEG-TS into the buffer hub: UDP multicast, HLS-pull, HTTP-TS, SRT, or file. `profileWriter.Ingest` discards `pkt.TS` silently (returns nil), so no blob/fragment is ever written.
Expand Down
10 changes: 10 additions & 0 deletions internal/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,16 @@ func (c *Coordinator) stopDVR(ctx context.Context, code domain.StreamCode) {
func (c *Coordinator) blobProfiles(stream *domain.Stream) []blob.ProfileSub {
rends := buffer.RenditionsForTranscoder(stream.Code, stream.Transcoder)
if len(rends) == 0 {
// Non-transcoded: the p0 lane records the stream's main buffer, but the
// blob writer only ingests AVPackets. A raw-TS main buffer (UDP /
// HLS-pull / HTTP-TS / SRT / file) records ZERO bytes while the status
// still reports "recording" (D-2). Refuse loudly so no catalog or
// Recording row is created and the status never lies.
if domain.StreamMainBufferIsTS(stream) {
slog.Warn("coordinator: DVR unsupported for a raw-TS source (writer records nothing) — skipping recording",
"stream_code", stream.Code)
return nil
}
return []blob.ProfileSub{{ID: "p0", BufferID: stream.Code, IsAudioSource: true}}
}
best := buffer.BestRenditionIndex(rends)
Expand Down
14 changes: 14 additions & 0 deletions internal/coordinator/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,17 @@ func TestReconcileOnceStartsTemplateInheritedInputs(t *testing.T) {
assert.True(t, h.coord.IsRunning("inherits"),
"stream inheriting Inputs from a template must be started (B-5)")
}

// D-2: a non-transcoded raw-TS source (UDP/HLS-pull/HTTP-TS/SRT/file) writes TS
// chunks the blob writer can't ingest — recording it produced ZERO bytes while
// status said "recording". blobProfiles must refuse it (no lane) so no catalog
// or Recording row is created. A non-transcoded AV source still records.
func TestBlobProfiles_RefusesRawTSSource(t *testing.T) {
t.Parallel()
h := newHarness(t)
raw := &domain.Stream{Code: "udp-dvr", DVR: &domain.StreamDVRConfig{Enabled: true}, Inputs: []domain.Input{{URL: "udp://239.0.0.1:1234"}}}
require.Nil(t, h.coord.blobProfiles(raw), "raw-TS source must not get a DVR lane (D-2)")

av := &domain.Stream{Code: "rtmp-dvr", DVR: &domain.StreamDVRConfig{Enabled: true}, Inputs: []domain.Input{{URL: "rtmp://host/app/key"}}}
require.NotEmpty(t, h.coord.blobProfiles(av), "non-transcoded AV source records via the p0 lane")
}
46 changes: 45 additions & 1 deletion internal/dvr/blob/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,24 @@ func (s *Service) StartRecording(ctx context.Context, code domain.StreamCode, pr
return nil, err
}

cat := newCatalog(code, profiles, cfg)
// Merge prior on-disk coverage so retention sees (and prunes) pre-restart
// hours instead of orphaning them. A fresh empty catalog overwriting
// catalog.json on every restart defeated retention and grew disk without
// bound (D-1). Hours/Available/Gaps are wall- and size-anchored, so
// retention stays correct regardless of the per-run media origin; the new
// run anchors its own origin, so recent timeshift keeps working.
if prev, ok, lerr := LoadCatalog(streamDir); lerr == nil && ok && prev.Format == CatalogFormat {
mergePriorCoverage(cat, prev)
}

s.mu.Lock()
if _, ok := s.active[code]; ok {
s.mu.Unlock()
return nil, fmt.Errorf("blob: already recording %s", code)
}
rctx, cancel := context.WithCancel(context.WithoutCancel(ctx)) //nolint:contextcheck // lane outlives the request
rec := &recording{streamDir: streamDir, cancel: cancel, cat: newCatalog(code, profiles, cfg)}
rec := &recording{streamDir: streamDir, cancel: cancel, cat: cat}
s.active[code] = rec
s.mu.Unlock()

Expand Down Expand Up @@ -273,6 +284,39 @@ func newCatalog(code domain.StreamCode, profiles []ProfileSub, cfg *domain.Strea
return cat
}

// mergePriorCoverage carries the on-disk coverage of a previously-saved catalog
// into a freshly-built one (same stream, new run), so a restart no longer wipes
// the record of pre-restart hours — which had left their blobs orphaned and
// uncounted by retention, growing disk without bound (D-1).
//
// Only wall/size-anchored coverage is carried (Hours, Available, Gaps); profile
// metadata and Retention come from the new config. Media-origin is deliberately
// NOT carried — the new run anchors its own origin so recent timeshift keeps
// working; pre-restart hours stay listed and prunable (retention is wall/size
// based) even though playing back across the restart boundary needs a per-hour
// reader anchor (follow-up). Profiles present only in the old catalog (ladder
// reorder / removed rung) are carried forward so their hours remain prunable.
func mergePriorCoverage(cat, prev *Catalog) {
prevByID := make(map[string]*ProfileDesc, len(prev.Profiles))
for i := range prev.Profiles {
prevByID[prev.Profiles[i].ID] = &prev.Profiles[i]
}
have := make(map[string]bool, len(cat.Profiles))
for i := range cat.Profiles {
have[cat.Profiles[i].ID] = true
if p, ok := prevByID[cat.Profiles[i].ID]; ok {
cat.Profiles[i].Hours = p.Hours
cat.Profiles[i].Available = p.Available
}
}
for i := range prev.Profiles {
if !have[prev.Profiles[i].ID] {
cat.Profiles = append(cat.Profiles, prev.Profiles[i])
}
}
cat.Gaps = prev.Gaps
}

// segDurFromCfg resolves the fragment target duration.
func segDurFromCfg(cfg *domain.StreamDVRConfig) time.Duration {
if cfg != nil && cfg.SegmentDuration > 0 {
Expand Down
40 changes: 40 additions & 0 deletions internal/dvr/blob/service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package blob

import (
"testing"

"github.com/stretchr/testify/require"
)

// D-1: a restart must not wipe the record of pre-restart hours. mergePriorCoverage
// carries prior Hours/Available/Gaps into the fresh catalog so retention sees (and
// prunes) them instead of orphaning the blobs and growing disk without bound.
func TestMergePriorCoverage(t *testing.T) {
t.Parallel()
prev := &Catalog{
Format: CatalogFormat,
Profiles: []ProfileDesc{
{ID: "p0", Hours: []HourRecord{{Hour: "2026/06/13/10", Sealed: true, SizeBytes: 100}}, Available: []MediaWindow{{FromMs: 1, ToMs: 2}}},
{ID: "p9-removed", Hours: []HourRecord{{Hour: "2026/06/13/09", Sealed: true, SizeBytes: 50}}},
},
Gaps: []Gap{{FromMs: 5, ToMs: 6, Reason: "drop"}},
}
cat := &Catalog{Format: CatalogFormat, Profiles: []ProfileDesc{{ID: "p0"}, {ID: "p1"}}}

mergePriorCoverage(cat, prev)

require.Len(t, cat.Profiles[0].Hours, 1, "p0 inherits prior hours")
require.Equal(t, int64(100), cat.Profiles[0].Hours[0].SizeBytes)
require.Len(t, cat.Profiles[0].Available, 1, "p0 inherits prior available windows")
require.Empty(t, cat.Profiles[1].Hours, "new profile p1 has no prior hours")

var removed *ProfileDesc
for i := range cat.Profiles {
if cat.Profiles[i].ID == "p9-removed" {
removed = &cat.Profiles[i]
}
}
require.NotNil(t, removed, "a profile only in the old catalog is carried forward so its hours stay prunable")
require.Len(t, removed.Hours, 1)
require.Len(t, cat.Gaps, 1, "gaps carried forward")
}