diff --git a/docs/audit/security-quality-audit.md b/docs/audit/security-quality-audit.md index 71287560..037e49bb 100644 --- a/docs/audit/security-quality-audit.md +++ b/docs/audit/security-quality-audit.md @@ -327,6 +327,8 @@ Covered as the HTTP-sink half of **S-2**. `batcher.go:414-446` POSTs to any oper --- #### D-1 (HIGH) — DVR catalog wiped on every recording restart → orphaned blobs, retention defeated, unbounded disk growth +> ✅ **FIXED** in `fix/dvr-data-loss` — `StartRecording` now `LoadCatalog`s the prior catalog and `mergePriorCoverage` carries its `Hours`/`Available`/`Gaps` (and profiles only in the old catalog) into the fresh one, so `pruneOnce` sees and prunes pre-restart hours (retention is wall/size-anchored → correct regardless of origin). The new run anchors its own media origin so recent timeshift keeps working; playing back **across** a restart boundary needs a per-hour reader anchor (noted follow-up). Test `TestMergePriorCoverage`. + **Files:** `internal/dvr/blob/service.go:96/100` (`newCatalog` + `Save`, `:251-274`); `LoadCatalog` only at `reader.go:48`; retention `retention.go:48-93`; `recovery.go:50-70` (`RepairStream` only seals `.open`-sentinel crash-dirty hours); trigger `coordinator.go:360/745-748`. **Merges:** two reported findings (identical root cause). @@ -339,6 +341,8 @@ Covered as the HTTP-sink half of **S-2**. `batcher.go:414-446` POSTs to any oper --- #### D-2 (HIGH) — Raw-TS / passthrough streams record ZERO bytes while `recording_status` reports "recording" +> ✅ **FIXED (fail-loud)** in `fix/dvr-data-loss` — `blobProfiles` now refuses a DVR lane for a non-transcoded raw-TS source (`StreamMainBufferIsTS`), so no catalog or Recording row is created and the status no longer lies. Making raw-TS DVR actually record (wire `pkt.TS` demux → AVPacket → `ingestAV`) is a deliberate follow-up. Test `TestBlobProfiles_RefusesRawTSSource`. + **Files:** `internal/dvr/blob/writer.go:102-120` (`Ingest` handles only `pkt.AV`; `pkt.TS` is an explicit unimplemented stub at `:117-118`); `service.go:100-113` (catalog + Recording row saved unconditionally); `coordinator.go:780-784` (`blobProfiles` single `p0` lane on `stream.Code`); status `blob_timeshift.go:45-48`. **Trigger:** Enable DVR on any non-transcoded stream whose ingest writes raw MPEG-TS into the buffer hub: UDP multicast, HLS-pull, HTTP-TS, SRT, or file. `profileWriter.Ingest` discards `pkt.TS` silently (returns nil), so no blob/fragment is ever written. diff --git a/internal/coordinator/coordinator.go b/internal/coordinator/coordinator.go index 846cc285..322d46dd 100644 --- a/internal/coordinator/coordinator.go +++ b/internal/coordinator/coordinator.go @@ -790,6 +790,16 @@ func (c *Coordinator) stopDVR(ctx context.Context, code domain.StreamCode) { func (c *Coordinator) blobProfiles(stream *domain.Stream) []blob.ProfileSub { rends := buffer.RenditionsForTranscoder(stream.Code, stream.Transcoder) if len(rends) == 0 { + // Non-transcoded: the p0 lane records the stream's main buffer, but the + // blob writer only ingests AVPackets. A raw-TS main buffer (UDP / + // HLS-pull / HTTP-TS / SRT / file) records ZERO bytes while the status + // still reports "recording" (D-2). Refuse loudly so no catalog or + // Recording row is created and the status never lies. + if domain.StreamMainBufferIsTS(stream) { + slog.Warn("coordinator: DVR unsupported for a raw-TS source (writer records nothing) — skipping recording", + "stream_code", stream.Code) + return nil + } return []blob.ProfileSub{{ID: "p0", BufferID: stream.Code, IsAudioSource: true}} } best := buffer.BestRenditionIndex(rends) diff --git a/internal/coordinator/reconciler_test.go b/internal/coordinator/reconciler_test.go index a239c131..8dacb9a4 100644 --- a/internal/coordinator/reconciler_test.go +++ b/internal/coordinator/reconciler_test.go @@ -196,3 +196,17 @@ func TestReconcileOnceStartsTemplateInheritedInputs(t *testing.T) { assert.True(t, h.coord.IsRunning("inherits"), "stream inheriting Inputs from a template must be started (B-5)") } + +// D-2: a non-transcoded raw-TS source (UDP/HLS-pull/HTTP-TS/SRT/file) writes TS +// chunks the blob writer can't ingest — recording it produced ZERO bytes while +// status said "recording". blobProfiles must refuse it (no lane) so no catalog +// or Recording row is created. A non-transcoded AV source still records. +func TestBlobProfiles_RefusesRawTSSource(t *testing.T) { + t.Parallel() + h := newHarness(t) + raw := &domain.Stream{Code: "udp-dvr", DVR: &domain.StreamDVRConfig{Enabled: true}, Inputs: []domain.Input{{URL: "udp://239.0.0.1:1234"}}} + require.Nil(t, h.coord.blobProfiles(raw), "raw-TS source must not get a DVR lane (D-2)") + + av := &domain.Stream{Code: "rtmp-dvr", DVR: &domain.StreamDVRConfig{Enabled: true}, Inputs: []domain.Input{{URL: "rtmp://host/app/key"}}} + require.NotEmpty(t, h.coord.blobProfiles(av), "non-transcoded AV source records via the p0 lane") +} diff --git a/internal/dvr/blob/service.go b/internal/dvr/blob/service.go index b24c97ba..e9f6b1a8 100644 --- a/internal/dvr/blob/service.go +++ b/internal/dvr/blob/service.go @@ -87,13 +87,24 @@ func (s *Service) StartRecording(ctx context.Context, code domain.StreamCode, pr return nil, err } + cat := newCatalog(code, profiles, cfg) + // Merge prior on-disk coverage so retention sees (and prunes) pre-restart + // hours instead of orphaning them. A fresh empty catalog overwriting + // catalog.json on every restart defeated retention and grew disk without + // bound (D-1). Hours/Available/Gaps are wall- and size-anchored, so + // retention stays correct regardless of the per-run media origin; the new + // run anchors its own origin, so recent timeshift keeps working. + if prev, ok, lerr := LoadCatalog(streamDir); lerr == nil && ok && prev.Format == CatalogFormat { + mergePriorCoverage(cat, prev) + } + s.mu.Lock() if _, ok := s.active[code]; ok { s.mu.Unlock() return nil, fmt.Errorf("blob: already recording %s", code) } rctx, cancel := context.WithCancel(context.WithoutCancel(ctx)) //nolint:contextcheck // lane outlives the request - rec := &recording{streamDir: streamDir, cancel: cancel, cat: newCatalog(code, profiles, cfg)} + rec := &recording{streamDir: streamDir, cancel: cancel, cat: cat} s.active[code] = rec s.mu.Unlock() @@ -273,6 +284,39 @@ func newCatalog(code domain.StreamCode, profiles []ProfileSub, cfg *domain.Strea return cat } +// mergePriorCoverage carries the on-disk coverage of a previously-saved catalog +// into a freshly-built one (same stream, new run), so a restart no longer wipes +// the record of pre-restart hours — which had left their blobs orphaned and +// uncounted by retention, growing disk without bound (D-1). +// +// Only wall/size-anchored coverage is carried (Hours, Available, Gaps); profile +// metadata and Retention come from the new config. Media-origin is deliberately +// NOT carried — the new run anchors its own origin so recent timeshift keeps +// working; pre-restart hours stay listed and prunable (retention is wall/size +// based) even though playing back across the restart boundary needs a per-hour +// reader anchor (follow-up). Profiles present only in the old catalog (ladder +// reorder / removed rung) are carried forward so their hours remain prunable. +func mergePriorCoverage(cat, prev *Catalog) { + prevByID := make(map[string]*ProfileDesc, len(prev.Profiles)) + for i := range prev.Profiles { + prevByID[prev.Profiles[i].ID] = &prev.Profiles[i] + } + have := make(map[string]bool, len(cat.Profiles)) + for i := range cat.Profiles { + have[cat.Profiles[i].ID] = true + if p, ok := prevByID[cat.Profiles[i].ID]; ok { + cat.Profiles[i].Hours = p.Hours + cat.Profiles[i].Available = p.Available + } + } + for i := range prev.Profiles { + if !have[prev.Profiles[i].ID] { + cat.Profiles = append(cat.Profiles, prev.Profiles[i]) + } + } + cat.Gaps = prev.Gaps +} + // segDurFromCfg resolves the fragment target duration. func segDurFromCfg(cfg *domain.StreamDVRConfig) time.Duration { if cfg != nil && cfg.SegmentDuration > 0 { diff --git a/internal/dvr/blob/service_test.go b/internal/dvr/blob/service_test.go new file mode 100644 index 00000000..d386af4c --- /dev/null +++ b/internal/dvr/blob/service_test.go @@ -0,0 +1,40 @@ +package blob + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +// D-1: a restart must not wipe the record of pre-restart hours. mergePriorCoverage +// carries prior Hours/Available/Gaps into the fresh catalog so retention sees (and +// prunes) them instead of orphaning the blobs and growing disk without bound. +func TestMergePriorCoverage(t *testing.T) { + t.Parallel() + prev := &Catalog{ + Format: CatalogFormat, + Profiles: []ProfileDesc{ + {ID: "p0", Hours: []HourRecord{{Hour: "2026/06/13/10", Sealed: true, SizeBytes: 100}}, Available: []MediaWindow{{FromMs: 1, ToMs: 2}}}, + {ID: "p9-removed", Hours: []HourRecord{{Hour: "2026/06/13/09", Sealed: true, SizeBytes: 50}}}, + }, + Gaps: []Gap{{FromMs: 5, ToMs: 6, Reason: "drop"}}, + } + cat := &Catalog{Format: CatalogFormat, Profiles: []ProfileDesc{{ID: "p0"}, {ID: "p1"}}} + + mergePriorCoverage(cat, prev) + + require.Len(t, cat.Profiles[0].Hours, 1, "p0 inherits prior hours") + require.Equal(t, int64(100), cat.Profiles[0].Hours[0].SizeBytes) + require.Len(t, cat.Profiles[0].Available, 1, "p0 inherits prior available windows") + require.Empty(t, cat.Profiles[1].Hours, "new profile p1 has no prior hours") + + var removed *ProfileDesc + for i := range cat.Profiles { + if cat.Profiles[i].ID == "p9-removed" { + removed = &cat.Profiles[i] + } + } + require.NotNil(t, removed, "a profile only in the old catalog is carried forward so its hours stay prunable") + require.Len(t, removed.Hours, 1) + require.Len(t, cat.Gaps, 1, "gaps carried forward") +}