diff --git a/docs/audit/security-quality-audit.md b/docs/audit/security-quality-audit.md index 037e49bb..4f9afbd5 100644 --- a/docs/audit/security-quality-audit.md +++ b/docs/audit/security-quality-audit.md @@ -402,6 +402,8 @@ Covered as the HTTP-sink half of **S-2**. `batcher.go:414-446` POSTs to any oper --- #### A-5 (HIGH) — `manager.Register` swallows initial `ingestor.Start` failure → permanent zombie stream reported Active +> ✅ **FIXED** in `fix/lifecycle-self-heal` — `Register`'s synchronous start-failure branch now routes through `ReportInputError`, so the input degrades, error history is recorded, and failover / exhausted-handling engage (multi-input promotes a backup; single-input flips to Degraded + probe loop). `Register` also refuses a duplicate registration (no monitor-goroutine leak). Tests `TestRegister_StartFailureDrivesExhausted`, `TestRegister_RefusesDuplicate`. (The connects-but-no-packets Idle blind spot is a noted follow-up.) + **Files:** `internal/manager/service.go:415-421` (error only logged, `Register` returns nil), `:816` (`collectTimeoutIfNeeded` needs StatusActive), `:846/849` (`collectProbeIfNeeded` needs StatusDegraded); `coordinator.go:189-199` (StreamStatus = Active when registered + no degradation), `:964-966` (reconciler skips IsRunning). **Trigger:** `mgr.Register` → `s.ingestor.Start` fails synchronously — `NewPacketReader` error for `KindUnknown` schemes (including extension-less http(s) HLS URLs the API never validates), `file://` with an unresolvable VOD mount, or `copy://`/`mixer://` whose upstream vanished before bootstrap. `Register` returns nil; the coordinator records the stream started, `IsRunning==true`. @@ -413,6 +415,8 @@ Covered as the HTTP-sink half of **S-2**. `batcher.go:414-446` POSTs to any oper --- #### A-6 (HIGH) — `reloadTranscoderFull` partial failure strands the stream with no publisher/DVR while IsRunning stays true +> ✅ **FIXED** in `fix/lifecycle-self-heal` — any error after teardown in `reloadTranscoderFull` now routes through `reloadFailed`, which does a clean full `Stop` (idempotent against the partial state) so `IsRunning` flips false and the reconciler restarts the stream from the persisted config within one tick — a permanent invisible outage becomes a ≤10 s self-heal. Test `TestUpdate_ReloadFailureFullStops`. + **Files:** `internal/coordinator/coordinator.go:585-652` — teardown `:588-591` (stopDVR/pub.Stop/tc.Stop), early returns at `:638-640` (tc.Start fails → publisher never restarted) and `:646-648` (pub.Start fails → DVR never restarted). **Trigger:** `PUT /streams/{code}` with a transcoder topology change (nil↔non-nil, video.copy flip, watermark change per `diff.go:123-170`) while `tc.Start` fails — realistic: transcoder binary missing after a bad deploy (`transcoder/service.go:312-316` probes the binary at Start), NVENC/GPU error. The reload has already torn down DVR + publisher + old transcoder; the early return leaves ingest running with zero outputs. Manager registration is never dropped, so `IsRunning==true` and `reconcileOnce` skips it forever. Template hot-reload routes every dependent through the same `Update`, so one broken-binary template edit can strand many streams at once. @@ -450,6 +454,8 @@ Covered as the HTTP-sink half of **S-2**. `batcher.go:414-446` POSTs to any oper --- #### C-1 (HIGH) — No per-stream serialisation of coordinator Start/Stop/Update; manager.Register overwrites state and leaks the monitor goroutine +> ✅ **FIXED** in `fix/lifecycle-self-heal` — a per-stream lifecycle mutex (`lockStream`) now wraps the full body of `Start`/`Stop`/`Update` (via `startLocked`/`stopLocked`/`updateLocked` so re-entrant internal calls don't deadlock); a racing op re-checks `IsRunning` under the lock and no-ops. `manager.Register` refuses a duplicate registration instead of overwriting (no monitor-goroutine/ticker leak). Test `TestLifecycle_ConcurrentStartStopSerialised` (-race). + **Files:** `internal/coordinator/coordinator.go:271` (unlocked `IsRunning` TOCTOU), `:330-340` (loser's `pub.Start`-failure rollback runs `mgr.Unregister` + `buf.Delete` on the **winner**'s resources, keyed only by stream code); `internal/manager/service.go:401-403` (`s.streams[code]=state` overwrites unconditionally, prior cancel unreachable), `:405` (second monitor goroutine spawned), `:698-709` (orphaned monitor exits only on ctx.Done). **Trigger:** Two operations on the same code race the multi-step wiring — most reachably two concurrent `/restart`, or a `DELETE`/template-reload `Stop` racing a reconciler/handler `Start`. `Coordinator.Stop` can land anywhere inside `Start`'s wiring (including the slow `tc.Start` subprocess spawn). **Corrections:** the "bootstrap vs reconciler" sub-trigger is wrong (`BootstrapPersistedStreams` completes before `RunReconciler` is spawned); the pure double-Start window on the normal path is microseconds — the practically wide trigger is the unguarded Stop-interleaves-Start variant. diff --git a/internal/coordinator/coordinator.go b/internal/coordinator/coordinator.go index 322d46dd..84d3b91d 100644 --- a/internal/coordinator/coordinator.go +++ b/internal/coordinator/coordinator.go @@ -71,6 +71,13 @@ type Coordinator struct { // envelope including started_at + uptime_sec) so frontend can // render uptime without a Prometheus round-trip. startedAt map[domain.StreamCode]time.Time + + // lifecycleLocks serialises Start/Stop/Update per stream so the multi-step + // pipeline wiring can't interleave. Without it a racing op's rollback tore + // down another op's buffers / manager registration (both keyed only by + // stream code) and leaked monitor goroutines (C-1). + lifecycleMu sync.Mutex + lifecycleLocks map[domain.StreamCode]*sync.Mutex } // streamDegradation flags the discrete failure modes that should pull a @@ -120,11 +127,12 @@ func New(i do.Injector) (*Coordinator, error) { } return s, true }, - renditions: make(map[domain.StreamCode][]string), - abrCopies: make(map[domain.StreamCode]*abrCopyEntry), - abrMixers: make(map[domain.StreamCode]*abrMixerEntry), - degradation: make(map[domain.StreamCode]*streamDegradation), - startedAt: make(map[domain.StreamCode]time.Time), + renditions: make(map[domain.StreamCode][]string), + abrCopies: make(map[domain.StreamCode]*abrCopyEntry), + abrMixers: make(map[domain.StreamCode]*abrMixerEntry), + degradation: make(map[domain.StreamCode]*streamDegradation), + startedAt: make(map[domain.StreamCode]time.Time), + lifecycleLocks: make(map[domain.StreamCode]*sync.Mutex), } // Watermarks library is optional — coordinator runs fine without it // (only direct ImagePath watermarks resolve). Tolerate "no provider". @@ -165,24 +173,40 @@ func newForTesting( m *metrics.Metrics, ) *Coordinator { c := &Coordinator{ - buf: buf, - mgr: mgr, - tc: tc, - pub: pub, - dvr: dvr, - bus: bus, - m: m, - renditions: make(map[domain.StreamCode][]string), - abrCopies: make(map[domain.StreamCode]*abrCopyEntry), - abrMixers: make(map[domain.StreamCode]*abrMixerEntry), - degradation: make(map[domain.StreamCode]*streamDegradation), - startedAt: make(map[domain.StreamCode]time.Time), + buf: buf, + mgr: mgr, + tc: tc, + pub: pub, + dvr: dvr, + bus: bus, + m: m, + renditions: make(map[domain.StreamCode][]string), + abrCopies: make(map[domain.StreamCode]*abrCopyEntry), + abrMixers: make(map[domain.StreamCode]*abrMixerEntry), + degradation: make(map[domain.StreamCode]*streamDegradation), + startedAt: make(map[domain.StreamCode]time.Time), + lifecycleLocks: make(map[domain.StreamCode]*sync.Mutex), } c.mgr.SetExhaustedCallback(c.handleAllInputsExhausted) c.mgr.SetRestoredCallback(c.handleInputRestored) return c } +// lockStream acquires the per-stream lifecycle lock and returns its unlock +// func. Start/Stop/Update hold it for their whole body so they serialise per +// stream; the *Locked variants run under it and must not re-acquire (C-1). +func (c *Coordinator) lockStream(code domain.StreamCode) func() { + c.lifecycleMu.Lock() + mu, ok := c.lifecycleLocks[code] + if !ok { + mu = &sync.Mutex{} + c.lifecycleLocks[code] = mu + } + c.lifecycleMu.Unlock() + mu.Lock() + return mu.Unlock +} + // SetUpstreamLookupForTesting injects an upstream stream resolver — used by // tests that exercise the ABR-copy branch without spinning up a store backend. func (c *Coordinator) SetUpstreamLookupForTesting(fn func(domain.StreamCode) (*domain.Stream, bool)) { @@ -278,6 +302,15 @@ func (c *Coordinator) Start(ctx context.Context, stream *domain.Stream) error { if stream == nil { return fmt.Errorf("coordinator: nil stream") } + unlock := c.lockStream(stream.Code) + defer unlock() + return c.startLocked(ctx, stream) +} + +// startLocked is Start's body; the caller holds the per-stream lifecycle lock +// (Start, or Update which already holds it). The IsRunning check is the +// re-check under the lock so a racing Start collapses to a clean no-op (C-1). +func (c *Coordinator) startLocked(ctx context.Context, stream *domain.Stream) error { if c.IsRunning(stream.Code) { return nil } @@ -447,6 +480,14 @@ func (c *Coordinator) RunningStreams() []domain.StreamCode { // ctx is used for the DVR stop and the EventStreamStopped publish; cleanup // of in-memory state always proceeds even if ctx is cancelled. func (c *Coordinator) Stop(ctx context.Context, streamID domain.StreamCode) { + unlock := c.lockStream(streamID) + defer unlock() + c.stopLocked(ctx, streamID) +} + +// stopLocked is Stop's body; the caller holds the per-stream lifecycle lock +// (Stop, Update, or the reloadTranscoderFull self-heal fallback). +func (c *Coordinator) stopLocked(ctx context.Context, streamID domain.StreamCode) { c.abrMu.Lock() abr, isABR := c.abrCopies[streamID] if isABR { @@ -505,7 +546,15 @@ func (c *Coordinator) Update(ctx context.Context, old, new *domain.Stream) error if old == nil || new == nil { return fmt.Errorf("coordinator: Update requires both old and new stream") } + unlock := c.lockStream(new.Code) + defer unlock() + return c.updateLocked(ctx, old, new) +} +// updateLocked is Update's body; it holds the per-stream lifecycle lock so it +// calls startLocked/stopLocked (never the public, re-locking variants) and +// serialises against a concurrent Start/Stop (C-1). +func (c *Coordinator) updateLocked(ctx context.Context, old, new *domain.Stream) error { // ABR copy / ABR mixer use custom pipelines (no ingestor / transcoder), // so the per-component diff routing doesn't apply. Whenever either side // is on one of these paths we full-cycle: stop whatever is running, then @@ -518,11 +567,11 @@ func (c *Coordinator) Update(ctx context.Context, old, new *domain.Stream) error _, _, willBeABRMixer := c.detectABRMixer(new) willBeABRMixer = willBeABRMixer && !new.Disabled if wasABRCopy || wasABRMixer || willBeABRCopy || willBeABRMixer { - c.Stop(ctx, old.Code) + c.stopLocked(ctx, old.Code) if new.Disabled || len(new.Inputs) == 0 { return nil } - return c.Start(ctx, new) + return c.startLocked(ctx, new) } diff := ComputeDiff(old, new) @@ -539,7 +588,7 @@ func (c *Coordinator) Update(ctx context.Context, old, new *domain.Stream) error ) if diff.NowDisabled { - c.Stop(ctx, new.Code) + c.stopLocked(ctx, new.Code) return nil } @@ -646,7 +695,7 @@ func (c *Coordinator) reloadTranscoderFull(ctx context.Context, old, new *domain if len(transcoderTargets) > 0 { tcRT := c.transcoderConfigWithWatermark(new) if err := c.tc.Start(ctx, new.Code, newIngestID, tcRT, transcoderTargets); err != nil { - return fmt.Errorf("transcoder start: %w", err) + return c.reloadFailed(ctx, new.Code, fmt.Errorf("transcoder start: %w", err)) } c.rendMu.Lock() c.renditions[new.Code] = renditionSlugs @@ -654,13 +703,30 @@ func (c *Coordinator) reloadTranscoderFull(ctx context.Context, old, new *domain } if err := c.pub.Start(ctx, new); err != nil { - return fmt.Errorf("publisher start: %w", err) + return c.reloadFailed(ctx, new.Code, fmt.Errorf("publisher start: %w", err)) } c.reloadDVR(ctx, new) return nil } +// reloadFailed handles a reloadTranscoderFull error that occurs AFTER teardown +// has begun. By that point DVR + publisher + the old transcoder are already +// down, but ingest may still be registered — a bare return would leave the +// stream IsRunning=true with zero output and the reconciler blind to it (A-6, +// e.g. a transcoder binary missing after a bad deploy, NVENC fault). Fall back +// to a clean full stop (Stop is idempotent against the partial state): IsRunning +// flips false, so the 10 s reconcile tick restarts the stream from the persisted +// config — a permanent invisible outage becomes a ≤10 s self-healing one, and a +// retried identical PUT becomes meaningful. The caller holds the lifecycle lock, +// so use stopLocked (not Stop) to avoid re-acquiring it. +func (c *Coordinator) reloadFailed(ctx context.Context, code domain.StreamCode, err error) error { + slog.Warn("coordinator: transcoder reload failed mid-flight — full stop for reconciler self-heal", + "stream_code", code, "err", err) + c.stopLocked(ctx, code) + return err +} + // reloadProfiles applies per-profile transcoder changes without touching unchanged profiles. // Updated profiles: stop+start the corresponding rendition. // Added profiles: create rendition buffer + start the rendition. diff --git a/internal/coordinator/lifecycle_test.go b/internal/coordinator/lifecycle_test.go new file mode 100644 index 00000000..a48b83e5 --- /dev/null +++ b/internal/coordinator/lifecycle_test.go @@ -0,0 +1,56 @@ +package coordinator + +import ( + "context" + "errors" + "sync" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ntt0601zcoder/open-streamer/internal/domain" +) + +// A-6: when a transcoder-topology Update tears the pipeline down and the new +// transcoder fails to start, the coordinator must NOT leave the stream +// IsRunning=true with no output (the reconciler would never restart it). It +// falls back to a full stop so the reconciler self-heals on the next tick. +func TestUpdate_ReloadFailureFullStops(t *testing.T) { + t.Parallel() + h := newHarness(t) + old := &domain.Stream{Code: "rs", Inputs: []domain.Input{{URL: "udp://x:1", Priority: 0}}} + require.NoError(t, h.coord.Start(context.Background(), old)) + require.True(t, h.coord.IsRunning("rs")) + + h.tc.startErr = errors.New("transcoder binary missing") + // nil → non-nil transcoder is a topology change → reloadTranscoderFull. + updated := &domain.Stream{ + Code: "rs", + Inputs: []domain.Input{{URL: "udp://x:1", Priority: 0}}, + Transcoder: &domain.TranscoderConfig{ + Video: domain.VideoTranscodeConfig{Profiles: []domain.VideoProfile{{Width: 1280, Height: 720, Bitrate: 2000}}}, + }, + } + err := h.coord.Update(context.Background(), old, updated) + require.Error(t, err, "reload must surface the transcoder start failure") + require.False(t, h.coord.IsRunning("rs"), + "a mid-flight reload failure must full-stop so the reconciler restarts it (A-6)") +} + +// C-1: concurrent Start/Stop on the same stream must be serialised by the +// per-stream lifecycle lock — without it the racing ops corrupt shared +// buffer/rendition state and leak monitor goroutines. Run under -race; the +// assertion is the absence of a data race / panic. +func TestLifecycle_ConcurrentStartStopSerialised(t *testing.T) { + t.Parallel() + h := newHarness(t) + st := &domain.Stream{Code: "cc", Inputs: []domain.Input{{URL: "udp://x:1", Priority: 0}}} + var wg sync.WaitGroup + for i := 0; i < 25; i++ { + wg.Add(2) + go func() { defer wg.Done(); _ = h.coord.Start(context.Background(), st) }() + go func() { defer wg.Done(); h.coord.Stop(context.Background(), "cc") }() + } + wg.Wait() + h.coord.Stop(context.Background(), "cc") // leave clean +} diff --git a/internal/coordinator/update_test.go b/internal/coordinator/update_test.go index 80278b26..56a038af 100644 --- a/internal/coordinator/update_test.go +++ b/internal/coordinator/update_test.go @@ -105,12 +105,16 @@ type spyTC struct { stopped []domain.StreamCode profilesStopped []int profilesStarted []int + startErr error // when set, Start returns it (drives the A-6 reload-failure path) } func (t *spyTC) Start(_ context.Context, c domain.StreamCode, _ domain.StreamCode, _ *domain.TranscoderConfig, _ []transcoder.RenditionTarget) error { t.mu.Lock() + defer t.mu.Unlock() + if t.startErr != nil { + return t.startErr + } t.started = append(t.started, c) - t.mu.Unlock() return nil } diff --git a/internal/manager/service.go b/internal/manager/service.go index 335be591..976995ed 100644 --- a/internal/manager/service.go +++ b/internal/manager/service.go @@ -399,6 +399,15 @@ func (s *Service) Register(ctx context.Context, stream *domain.Stream, bufferWri } s.mu.Lock() + if _, exists := s.streams[stream.Code]; exists { + // Already registered: overwriting would orphan the prior monitor + // goroutine + ticker (cancel unreachable) and let a concurrent + // Unregister stop the new worker. Refuse — the coordinator's + // per-stream lifecycle lock makes this a defensive backstop (C-1). + s.mu.Unlock() + cancel() + return fmt.Errorf("manager: stream %q already registered", stream.Code) + } s.streams[stream.Code] = state s.mu.Unlock() @@ -419,6 +428,13 @@ func (s *Service) Register(ctx context.Context, stream *domain.Stream, bufferWri "input_priority", best.Input.Priority, "err", err, ) + // Route the synchronous start failure through the normal degrade + // path so the input is marked degraded, error history recorded, and + // failover / exhausted-handling engage. Without this the stream sat + // Idle while StreamStatus reported Active and the reconciler was + // blind (A-5): multi-input promotes a backup; single-input flips to + // Degraded and the probe loop keeps trying to recover. + s.ReportInputError(stream.Code, best.Input.Priority, err) } else { // Record the baseline event so the UI's switch history shows // "stream came up on input N at T" before any failover happens. diff --git a/internal/manager/service_lifecycle_test.go b/internal/manager/service_lifecycle_test.go index 4685ed54..2dab1173 100644 --- a/internal/manager/service_lifecycle_test.go +++ b/internal/manager/service_lifecycle_test.go @@ -527,3 +527,38 @@ func TestRestoredCallback_FiresAfterExhaustedRecovers(t *testing.T) { } } } + +// A-5 + C-1: re-registering an already-registered stream must error, not +// silently overwrite the prior monitor goroutine + ticker (leak) and let a +// concurrent Unregister stop the new worker. +func TestRegister_RefusesDuplicate(t *testing.T) { + t.Parallel() + svc, _ := newSvc(t) + st := &domain.Stream{Code: "dup", Inputs: []domain.Input{{URL: "udp://x:1", Priority: 0}}} + require.NoError(t, svc.Register(context.Background(), st, "dup")) + require.Error(t, svc.Register(context.Background(), st, "dup"), + "re-registering an active stream must error") +} + +// A-5: a synchronous ingest-start failure must degrade the input and (for a +// single-input stream) drive it to exhausted — not sit silently Idle while the +// coordinator reports the stream Active and the reconciler stays blind. +func TestRegister_StartFailureDrivesExhausted(t *testing.T) { + t.Parallel() + svc, ing := newSvc(t) + ing.startErr = errors.New("dead source") + exhausted := make(chan domain.StreamCode, 1) + svc.SetExhaustedCallback(func(c domain.StreamCode) { + select { + case exhausted <- c: + default: + } + }) + st := &domain.Stream{Code: "deadsrc", Inputs: []domain.Input{{URL: "udp://x:1", Priority: 0}}} + require.NoError(t, svc.Register(context.Background(), st, "deadsrc")) + select { + case <-exhausted: + case <-time.After(2 * time.Second): + t.Fatal("single-input start failure must degrade → exhausted (A-5)") + } +}