Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/audit/security-quality-audit.md
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ Covered as the HTTP-sink half of **S-2**. `batcher.go:414-446` POSTs to any oper
---

#### A-5 (HIGH) — `manager.Register` swallows initial `ingestor.Start` failure → permanent zombie stream reported Active
> ✅ **FIXED** in `fix/lifecycle-self-heal` — `Register`'s synchronous start-failure branch now routes through `ReportInputError`, so the input degrades, error history is recorded, and failover / exhausted-handling engage (multi-input promotes a backup; single-input flips to Degraded + probe loop). `Register` also refuses a duplicate registration (no monitor-goroutine leak). Tests `TestRegister_StartFailureDrivesExhausted`, `TestRegister_RefusesDuplicate`. (The connects-but-no-packets Idle blind spot is a noted follow-up.)

**Files:** `internal/manager/service.go:415-421` (error only logged, `Register` returns nil), `:816` (`collectTimeoutIfNeeded` needs StatusActive), `:846/849` (`collectProbeIfNeeded` needs StatusDegraded); `coordinator.go:189-199` (StreamStatus = Active when registered + no degradation), `:964-966` (reconciler skips IsRunning).

**Trigger:** `mgr.Register` → `s.ingestor.Start` fails synchronously — `NewPacketReader` error for `KindUnknown` schemes (including extension-less http(s) HLS URLs the API never validates), `file://` with an unresolvable VOD mount, or `copy://`/`mixer://` whose upstream vanished before bootstrap. `Register` returns nil; the coordinator records the stream started, `IsRunning==true`.
Expand All @@ -413,6 +415,8 @@ Covered as the HTTP-sink half of **S-2**. `batcher.go:414-446` POSTs to any oper
---

#### A-6 (HIGH) — `reloadTranscoderFull` partial failure strands the stream with no publisher/DVR while IsRunning stays true
> ✅ **FIXED** in `fix/lifecycle-self-heal` — any error after teardown in `reloadTranscoderFull` now routes through `reloadFailed`, which does a clean full `Stop` (idempotent against the partial state) so `IsRunning` flips false and the reconciler restarts the stream from the persisted config within one tick — a permanent invisible outage becomes a ≤10 s self-heal. Test `TestUpdate_ReloadFailureFullStops`.

**Files:** `internal/coordinator/coordinator.go:585-652` — teardown `:588-591` (stopDVR/pub.Stop/tc.Stop), early returns at `:638-640` (tc.Start fails → publisher never restarted) and `:646-648` (pub.Start fails → DVR never restarted).

**Trigger:** `PUT /streams/{code}` with a transcoder topology change (nil↔non-nil, video.copy flip, watermark change per `diff.go:123-170`) while `tc.Start` fails — realistic: transcoder binary missing after a bad deploy (`transcoder/service.go:312-316` probes the binary at Start), NVENC/GPU error. The reload has already torn down DVR + publisher + old transcoder; the early return leaves ingest running with zero outputs. Manager registration is never dropped, so `IsRunning==true` and `reconcileOnce` skips it forever. Template hot-reload routes every dependent through the same `Update`, so one broken-binary template edit can strand many streams at once.
Expand Down Expand Up @@ -450,6 +454,8 @@ Covered as the HTTP-sink half of **S-2**. `batcher.go:414-446` POSTs to any oper
---

#### C-1 (HIGH) — No per-stream serialisation of coordinator Start/Stop/Update; manager.Register overwrites state and leaks the monitor goroutine
> ✅ **FIXED** in `fix/lifecycle-self-heal` — a per-stream lifecycle mutex (`lockStream`) now wraps the full body of `Start`/`Stop`/`Update` (via `startLocked`/`stopLocked`/`updateLocked` so re-entrant internal calls don't deadlock); a racing op re-checks `IsRunning` under the lock and no-ops. `manager.Register` refuses a duplicate registration instead of overwriting (no monitor-goroutine/ticker leak). Test `TestLifecycle_ConcurrentStartStopSerialised` (-race).

**Files:** `internal/coordinator/coordinator.go:271` (unlocked `IsRunning` TOCTOU), `:330-340` (loser's `pub.Start`-failure rollback runs `mgr.Unregister` + `buf.Delete` on the **winner**'s resources, keyed only by stream code); `internal/manager/service.go:401-403` (`s.streams[code]=state` overwrites unconditionally, prior cancel unreachable), `:405` (second monitor goroutine spawned), `:698-709` (orphaned monitor exits only on ctx.Done).

**Trigger:** Two operations on the same code race the multi-step wiring — most reachably two concurrent `/restart`, or a `DELETE`/template-reload `Stop` racing a reconciler/handler `Start`. `Coordinator.Stop` can land anywhere inside `Start`'s wiring (including the slow `tc.Start` subprocess spawn). **Corrections:** the "bootstrap vs reconciler" sub-trigger is wrong (`BootstrapPersistedStreams` completes before `RunReconciler` is spawned); the pure double-Start window on the normal path is microseconds — the practically wide trigger is the unguarded Stop-interleaves-Start variant.
Expand Down
110 changes: 88 additions & 22 deletions internal/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ type Coordinator struct {
// envelope including started_at + uptime_sec) so frontend can
// render uptime without a Prometheus round-trip.
startedAt map[domain.StreamCode]time.Time

// lifecycleLocks serialises Start/Stop/Update per stream so the multi-step
// pipeline wiring can't interleave. Without it a racing op's rollback tore
// down another op's buffers / manager registration (both keyed only by
// stream code) and leaked monitor goroutines (C-1).
lifecycleMu sync.Mutex
lifecycleLocks map[domain.StreamCode]*sync.Mutex
}

// streamDegradation flags the discrete failure modes that should pull a
Expand Down Expand Up @@ -120,11 +127,12 @@ func New(i do.Injector) (*Coordinator, error) {
}
return s, true
},
renditions: make(map[domain.StreamCode][]string),
abrCopies: make(map[domain.StreamCode]*abrCopyEntry),
abrMixers: make(map[domain.StreamCode]*abrMixerEntry),
degradation: make(map[domain.StreamCode]*streamDegradation),
startedAt: make(map[domain.StreamCode]time.Time),
renditions: make(map[domain.StreamCode][]string),
abrCopies: make(map[domain.StreamCode]*abrCopyEntry),
abrMixers: make(map[domain.StreamCode]*abrMixerEntry),
degradation: make(map[domain.StreamCode]*streamDegradation),
startedAt: make(map[domain.StreamCode]time.Time),
lifecycleLocks: make(map[domain.StreamCode]*sync.Mutex),
}
// Watermarks library is optional — coordinator runs fine without it
// (only direct ImagePath watermarks resolve). Tolerate "no provider".
Expand Down Expand Up @@ -165,24 +173,40 @@ func newForTesting(
m *metrics.Metrics,
) *Coordinator {
c := &Coordinator{
buf: buf,
mgr: mgr,
tc: tc,
pub: pub,
dvr: dvr,
bus: bus,
m: m,
renditions: make(map[domain.StreamCode][]string),
abrCopies: make(map[domain.StreamCode]*abrCopyEntry),
abrMixers: make(map[domain.StreamCode]*abrMixerEntry),
degradation: make(map[domain.StreamCode]*streamDegradation),
startedAt: make(map[domain.StreamCode]time.Time),
buf: buf,
mgr: mgr,
tc: tc,
pub: pub,
dvr: dvr,
bus: bus,
m: m,
renditions: make(map[domain.StreamCode][]string),
abrCopies: make(map[domain.StreamCode]*abrCopyEntry),
abrMixers: make(map[domain.StreamCode]*abrMixerEntry),
degradation: make(map[domain.StreamCode]*streamDegradation),
startedAt: make(map[domain.StreamCode]time.Time),
lifecycleLocks: make(map[domain.StreamCode]*sync.Mutex),
}
c.mgr.SetExhaustedCallback(c.handleAllInputsExhausted)
c.mgr.SetRestoredCallback(c.handleInputRestored)
return c
}

// lockStream acquires the per-stream lifecycle lock and returns its unlock
// func. Start/Stop/Update hold it for their whole body so they serialise per
// stream; the *Locked variants run under it and must not re-acquire (C-1).
func (c *Coordinator) lockStream(code domain.StreamCode) func() {
c.lifecycleMu.Lock()
mu, ok := c.lifecycleLocks[code]
if !ok {
mu = &sync.Mutex{}
c.lifecycleLocks[code] = mu
}
c.lifecycleMu.Unlock()
mu.Lock()
return mu.Unlock
}

// SetUpstreamLookupForTesting injects an upstream stream resolver — used by
// tests that exercise the ABR-copy branch without spinning up a store backend.
func (c *Coordinator) SetUpstreamLookupForTesting(fn func(domain.StreamCode) (*domain.Stream, bool)) {
Expand Down Expand Up @@ -278,6 +302,15 @@ func (c *Coordinator) Start(ctx context.Context, stream *domain.Stream) error {
if stream == nil {
return fmt.Errorf("coordinator: nil stream")
}
unlock := c.lockStream(stream.Code)
defer unlock()
return c.startLocked(ctx, stream)
}

// startLocked is Start's body; the caller holds the per-stream lifecycle lock
// (Start, or Update which already holds it). The IsRunning check is the
// re-check under the lock so a racing Start collapses to a clean no-op (C-1).
func (c *Coordinator) startLocked(ctx context.Context, stream *domain.Stream) error {
if c.IsRunning(stream.Code) {
return nil
}
Expand Down Expand Up @@ -447,6 +480,14 @@ func (c *Coordinator) RunningStreams() []domain.StreamCode {
// ctx is used for the DVR stop and the EventStreamStopped publish; cleanup
// of in-memory state always proceeds even if ctx is cancelled.
func (c *Coordinator) Stop(ctx context.Context, streamID domain.StreamCode) {
unlock := c.lockStream(streamID)
defer unlock()
c.stopLocked(ctx, streamID)
}

// stopLocked is Stop's body; the caller holds the per-stream lifecycle lock
// (Stop, Update, or the reloadTranscoderFull self-heal fallback).
func (c *Coordinator) stopLocked(ctx context.Context, streamID domain.StreamCode) {
c.abrMu.Lock()
abr, isABR := c.abrCopies[streamID]
if isABR {
Expand Down Expand Up @@ -505,7 +546,15 @@ func (c *Coordinator) Update(ctx context.Context, old, new *domain.Stream) error
if old == nil || new == nil {
return fmt.Errorf("coordinator: Update requires both old and new stream")
}
unlock := c.lockStream(new.Code)
defer unlock()
return c.updateLocked(ctx, old, new)
}

// updateLocked is Update's body; it holds the per-stream lifecycle lock so it
// calls startLocked/stopLocked (never the public, re-locking variants) and
// serialises against a concurrent Start/Stop (C-1).
func (c *Coordinator) updateLocked(ctx context.Context, old, new *domain.Stream) error {
// ABR copy / ABR mixer use custom pipelines (no ingestor / transcoder),
// so the per-component diff routing doesn't apply. Whenever either side
// is on one of these paths we full-cycle: stop whatever is running, then
Expand All @@ -518,11 +567,11 @@ func (c *Coordinator) Update(ctx context.Context, old, new *domain.Stream) error
_, _, willBeABRMixer := c.detectABRMixer(new)
willBeABRMixer = willBeABRMixer && !new.Disabled
if wasABRCopy || wasABRMixer || willBeABRCopy || willBeABRMixer {
c.Stop(ctx, old.Code)
c.stopLocked(ctx, old.Code)
if new.Disabled || len(new.Inputs) == 0 {
return nil
}
return c.Start(ctx, new)
return c.startLocked(ctx, new)
}

diff := ComputeDiff(old, new)
Expand All @@ -539,7 +588,7 @@ func (c *Coordinator) Update(ctx context.Context, old, new *domain.Stream) error
)

if diff.NowDisabled {
c.Stop(ctx, new.Code)
c.stopLocked(ctx, new.Code)
return nil
}

Expand Down Expand Up @@ -646,21 +695,38 @@ func (c *Coordinator) reloadTranscoderFull(ctx context.Context, old, new *domain
if len(transcoderTargets) > 0 {
tcRT := c.transcoderConfigWithWatermark(new)
if err := c.tc.Start(ctx, new.Code, newIngestID, tcRT, transcoderTargets); err != nil {
return fmt.Errorf("transcoder start: %w", err)
return c.reloadFailed(ctx, new.Code, fmt.Errorf("transcoder start: %w", err))
}
c.rendMu.Lock()
c.renditions[new.Code] = renditionSlugs
c.rendMu.Unlock()
}

if err := c.pub.Start(ctx, new); err != nil {
return fmt.Errorf("publisher start: %w", err)
return c.reloadFailed(ctx, new.Code, fmt.Errorf("publisher start: %w", err))
}

c.reloadDVR(ctx, new)
return nil
}

// reloadFailed handles a reloadTranscoderFull error that occurs AFTER teardown
// has begun. By that point DVR + publisher + the old transcoder are already
// down, but ingest may still be registered — a bare return would leave the
// stream IsRunning=true with zero output and the reconciler blind to it (A-6,
// e.g. a transcoder binary missing after a bad deploy, NVENC fault). Fall back
// to a clean full stop (Stop is idempotent against the partial state): IsRunning
// flips false, so the 10 s reconcile tick restarts the stream from the persisted
// config — a permanent invisible outage becomes a ≤10 s self-healing one, and a
// retried identical PUT becomes meaningful. The caller holds the lifecycle lock,
// so use stopLocked (not Stop) to avoid re-acquiring it.
func (c *Coordinator) reloadFailed(ctx context.Context, code domain.StreamCode, err error) error {
slog.Warn("coordinator: transcoder reload failed mid-flight — full stop for reconciler self-heal",
"stream_code", code, "err", err)
c.stopLocked(ctx, code)
return err
}

// reloadProfiles applies per-profile transcoder changes without touching unchanged profiles.
// Updated profiles: stop+start the corresponding rendition.
// Added profiles: create rendition buffer + start the rendition.
Expand Down
56 changes: 56 additions & 0 deletions internal/coordinator/lifecycle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package coordinator

import (
"context"
"errors"
"sync"
"testing"

"github.com/stretchr/testify/require"

"github.com/ntt0601zcoder/open-streamer/internal/domain"
)

// A-6: when a transcoder-topology Update tears the pipeline down and the new
// transcoder fails to start, the coordinator must NOT leave the stream
// IsRunning=true with no output (the reconciler would never restart it). It
// falls back to a full stop so the reconciler self-heals on the next tick.
func TestUpdate_ReloadFailureFullStops(t *testing.T) {
t.Parallel()
h := newHarness(t)
old := &domain.Stream{Code: "rs", Inputs: []domain.Input{{URL: "udp://x:1", Priority: 0}}}
require.NoError(t, h.coord.Start(context.Background(), old))
require.True(t, h.coord.IsRunning("rs"))

h.tc.startErr = errors.New("transcoder binary missing")
// nil → non-nil transcoder is a topology change → reloadTranscoderFull.
updated := &domain.Stream{
Code: "rs",
Inputs: []domain.Input{{URL: "udp://x:1", Priority: 0}},
Transcoder: &domain.TranscoderConfig{
Video: domain.VideoTranscodeConfig{Profiles: []domain.VideoProfile{{Width: 1280, Height: 720, Bitrate: 2000}}},
},
}
err := h.coord.Update(context.Background(), old, updated)
require.Error(t, err, "reload must surface the transcoder start failure")
require.False(t, h.coord.IsRunning("rs"),
"a mid-flight reload failure must full-stop so the reconciler restarts it (A-6)")
}

// C-1: concurrent Start/Stop on the same stream must be serialised by the
// per-stream lifecycle lock — without it the racing ops corrupt shared
// buffer/rendition state and leak monitor goroutines. Run under -race; the
// assertion is the absence of a data race / panic.
func TestLifecycle_ConcurrentStartStopSerialised(t *testing.T) {
t.Parallel()
h := newHarness(t)
st := &domain.Stream{Code: "cc", Inputs: []domain.Input{{URL: "udp://x:1", Priority: 0}}}
var wg sync.WaitGroup
for i := 0; i < 25; i++ {
wg.Add(2)
go func() { defer wg.Done(); _ = h.coord.Start(context.Background(), st) }()
go func() { defer wg.Done(); h.coord.Stop(context.Background(), "cc") }()
}
wg.Wait()
h.coord.Stop(context.Background(), "cc") // leave clean
}
6 changes: 5 additions & 1 deletion internal/coordinator/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,16 @@ type spyTC struct {
stopped []domain.StreamCode
profilesStopped []int
profilesStarted []int
startErr error // when set, Start returns it (drives the A-6 reload-failure path)
}

func (t *spyTC) Start(_ context.Context, c domain.StreamCode, _ domain.StreamCode, _ *domain.TranscoderConfig, _ []transcoder.RenditionTarget) error {
t.mu.Lock()
defer t.mu.Unlock()
if t.startErr != nil {
return t.startErr
}
t.started = append(t.started, c)
t.mu.Unlock()
return nil
}

Expand Down
16 changes: 16 additions & 0 deletions internal/manager/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,15 @@ func (s *Service) Register(ctx context.Context, stream *domain.Stream, bufferWri
}

s.mu.Lock()
if _, exists := s.streams[stream.Code]; exists {
// Already registered: overwriting would orphan the prior monitor
// goroutine + ticker (cancel unreachable) and let a concurrent
// Unregister stop the new worker. Refuse — the coordinator's
// per-stream lifecycle lock makes this a defensive backstop (C-1).
s.mu.Unlock()
cancel()
return fmt.Errorf("manager: stream %q already registered", stream.Code)
}
s.streams[stream.Code] = state
s.mu.Unlock()

Expand All @@ -419,6 +428,13 @@ func (s *Service) Register(ctx context.Context, stream *domain.Stream, bufferWri
"input_priority", best.Input.Priority,
"err", err,
)
// Route the synchronous start failure through the normal degrade
// path so the input is marked degraded, error history recorded, and
// failover / exhausted-handling engage. Without this the stream sat
// Idle while StreamStatus reported Active and the reconciler was
// blind (A-5): multi-input promotes a backup; single-input flips to
// Degraded and the probe loop keeps trying to recover.
s.ReportInputError(stream.Code, best.Input.Priority, err)
} else {
// Record the baseline event so the UI's switch history shows
// "stream came up on input N at T" before any failover happens.
Expand Down
35 changes: 35 additions & 0 deletions internal/manager/service_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,3 +527,38 @@ func TestRestoredCallback_FiresAfterExhaustedRecovers(t *testing.T) {
}
}
}

// A-5 + C-1: re-registering an already-registered stream must error, not
// silently overwrite the prior monitor goroutine + ticker (leak) and let a
// concurrent Unregister stop the new worker.
func TestRegister_RefusesDuplicate(t *testing.T) {
t.Parallel()
svc, _ := newSvc(t)
st := &domain.Stream{Code: "dup", Inputs: []domain.Input{{URL: "udp://x:1", Priority: 0}}}
require.NoError(t, svc.Register(context.Background(), st, "dup"))
require.Error(t, svc.Register(context.Background(), st, "dup"),
"re-registering an active stream must error")
}

// A-5: a synchronous ingest-start failure must degrade the input and (for a
// single-input stream) drive it to exhausted — not sit silently Idle while the
// coordinator reports the stream Active and the reconciler stays blind.
func TestRegister_StartFailureDrivesExhausted(t *testing.T) {
t.Parallel()
svc, ing := newSvc(t)
ing.startErr = errors.New("dead source")
exhausted := make(chan domain.StreamCode, 1)
svc.SetExhaustedCallback(func(c domain.StreamCode) {
select {
case exhausted <- c:
default:
}
})
st := &domain.Stream{Code: "deadsrc", Inputs: []domain.Input{{URL: "udp://x:1", Priority: 0}}}
require.NoError(t, svc.Register(context.Background(), st, "deadsrc"))
select {
case <-exhausted:
case <-time.After(2 * time.Second):
t.Fatal("single-input start failure must degrade → exhausted (A-5)")
}
}