diff --git a/CHANGELOG.md b/CHANGELOG.md index 730b695..db56639 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Typed event taxonomy with per-event notifier subscriptions (new releases, sent-to-client, errors) and a read-only per-topic event timeline. Download-finished event emission is deferred to a later phase (#93). + ## [1.4.0] - 2026-06-24 ### Added diff --git a/CLAUDE.md b/CLAUDE.md index e7c887a..b107136 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -50,9 +50,10 @@ techdebt/ Debt-tracking files (one per issue, see global rule) | `config` | env-driven config struct (caarlos0/env) — **add new env vars here** | | `crypto` | AES-256-GCM for tracker credentials and client config blobs | | `db` / `db/repo` | pgxpool wrapper + repository structs (`Topics`, `Clients`, `Notifiers`, `Users`, `TrackerCredentials`, `Deliveries`, `Audit`, `Settings`). `Settings` (repo added for the Sonarr integration) reads/writes the singleton `settings` row — `GetSonarr`/`UpsertSonarr` (API key encrypted at rest like `oidc_client_secret_enc`, migration `0009`) + `UpdateSonarrCursor` (history-poll cursor). `Topics` adds `GetByURL(user,url)` for the poller's dedup pre-check; `Users` adds `GetInitialAdmin`. `TrackerCredentials` carries encrypted `secret_enc` (password) **and** `session_enc`/`session_nonce` (cookie-session blob; migration `0002`), plus a nullable `session_expired_at` marker (migration `0003`) the scheduler uses to dedupe expiry notifications (atomic `MarkSessionExpired`, cleared by `SetSession` on re-auth). `Deliveries` (migration `0006`, `topic_deliveries`) records every torrent pushed to a client — `{topic_id, infohash, label, client_id, delivered_at}`, unique on `(topic_id, infohash)` so `Record` is idempotent (`ON CONFLICT DO NOTHING`). `Notifiers` gains `is_default bool` (migration `0008`, per-type unique partial index) + `Update(ctx, id, userID, name, displayName string, events []string, isDefault bool, configEnc, configNonce []byte) error` | -| **`notify`** | reusable notification dispatcher — `Send(userID, domain.Message)` fans out to all of a user's configured notifiers (best-effort, metered); `SendVia(userID, notifierID, …)` scopes the same fan-out to one notifier when a topic overrides it (nil ⇒ the user's **default** notifiers only (strict; none set ⇒ no send)). Consumers: scheduler new-release alerts + topic-error alerts (both per-topic via `SendVia`; error events deduped to fire once per error episode, on the first failure) + credential session-expiry alerts (global). The single event→notifier fan-out point | +| **`notify`** | reusable notification dispatcher — `Send(userID, domain.Message)` fans out to all of a user's configured notifiers (best-effort, metered); `SendVia(userID, notifierID, …)` scopes the same fan-out to one notifier when a topic overrides it (nil ⇒ the user's **default** notifiers only (strict; none set ⇒ no send)). Consumers: the `events.Bus` fan-out (per-event subscription, wired via `subscribed()` which maps typed `events.Type` to notifier event list; legacy `['updated','error']` rows kept working via dispatcher aliases). The single event→notifier fan-out point. Error events are deduped to fire once per error episode (on the first failure); session-expiry alerts are global one-shot | | `domain` | core types: `Topic` (incl. per-topic `ClientID`, **`NotifierID`** (per-topic notifier override, migration `0007`), `DownloadDir`, `Category`, `ImageURL`), `Check`, `Payload`, `TrackerCredential`, `TopicDelivery`, `AddOptions` (`DownloadDir` + `Category`) | -| **`topics`** | shared `BuildAndCreate(store, CreateInput)` — the one tracker-match → `Parse` → fail-open `ResolveMetadata` → build → persist sequence, used by BOTH the `POST /topics` handler and the Sonarr poller (no duplication). Idempotent: a `(user_id,url)` unique-violation returns `Result{Created:false}` instead of erroring. Sentinels `ErrNoTracker`/`ErrParse`/`ErrQualityUnsupported` | +| **`events`** | canonical event taxonomy (`Type` consts: topic.added, check.{started,completed,failed}, release.found, download.submitted, session.expired, download.{progress,completed}) + per-type `Policy` (persist/notifiable/sse) + `Bus.Emit` — the single event→sinks fan-out (history `topic_events`, notifier dispatcher, SSE seam). Phase-1 SSE publisher is nil (Phase 3). Phase-2 `download.{progress,completed}` policy rows exist but nothing emits them yet. `GET /api/v1/topics/{id}/events` endpoint reads the history. Frontend: `components/topics/TopicEventsTimeline`, `components/notifiers/EventPicker`, `lib/events.ts` | +| **`topics`** | shared `BuildAndCreate(store, CreateInput)` — the one tracker-match → `Parse` → fail-open `ResolveMetadata` → build → persist sequence, used by BOTH the `POST /topics` handler and the Sonarr poller (no duplication). Idempotent: a `(user_id,url)` unique-violation returns `Result{Created:false}` instead of erroring. Sentinels `ErrNoTracker`/`ErrParse`/`ErrQualityUnsupported`. **`TopicEvents`** sibling repo (history feed: `Record`/`ListForTopic`/`ListForUserSince`) | | **`sonarr`** | Sonarr integration (issue #86): a typed read-only API `Client` (`SystemStatus` for the Test button; `GrabHistorySince` reads `eventType=grabbed` history, extracting `data.nzbInfoUrl` — the tracker topic URL, **not** `guid`) + a `Poller` that mirrors the scheduler (ticker loop, ctx-cancel, fail-open). Self-gates on the DB `settings.sonarr_enabled`, resolves the owner (configured admin ⇒ first admin), dedups history by URL, filters by allowed trackers, and auto-creates topics via `topics.BuildAndCreate` with configured default client/category/dir. First enable is go-forward only (cursor stamped, no historical import). Admin config UI: **Integrations** page (`pages/Integrations.tsx` → `components/integrations/SonarrCard`); API `GET/PUT/POST /api/v1/system/sonarr{,/test}` | | **`infohash`** | derives the BitTorrent v1 infohash (lowercase hex) from a `Payload` — `FromMagnet` (parses `xt=urn:btih:`, hex or base32) / `FromTorrent` (SHA-1 of the bencoded `info` dict via a length-based scanner) / `FromPayload`. The universal key linking a delivery to a client's live torrent status | | **`extra`** | shared `extra.Int / StringSlice / String` helpers for the untyped `map[string]any` blobs in `Topic.Extra` and `Check.Extra` (added 2026-04-07; **use this instead of writing local helpers**) | @@ -91,6 +92,8 @@ techdebt/ Debt-tracking files (one per issue, see global rule) 4. `recordResult` — persists `next_check_at` (with exponential backoff on errors, capped at 6h) and writes the run summary metrics. +**Event emission:** the scheduler emits typed `events.Event`s via `events.Bus.Emit` at key points: `check.started`/`check.completed`/`check.failed` (per check cycle), `release.found` (per new torrent detected), `download.submitted` (after client send), and `session.expired` (on credential session loss). (`topic.added` is emitted separately by the topics HTTP handler on `POST /topics`, not by the scheduler.) The bus fans out to `topic_events` history table (all events persisted if policy allows), the notifier dispatcher (for event-type subscriptions), and an SSE seam (Phase 3 — publisher nil in Phase 1). + **Per-topic delivery:** `sendViaClient` passes `domain.AddOptions{DownloadDir: t.DownloadDir, Category: t.Category}` to the client plugin. Category is a **path segment, not a client-native label**: each client config carries an @@ -134,6 +137,10 @@ src/ │ ├── shared/ Reusable across pages │ │ ├── DeleteConfirm.tsx Two-click destructive confirm (uses useArmedConfirm) │ │ └── ResourceCard.tsx Slot-based card chrome for list pages +│ ├── topics/ Page-specific topic components +│ │ └── TopicEventsTimeline.tsx Per-topic event history feed (read-only) +│ ├── notifiers/ Page-specific notifier components +│ │ └── EventPicker.tsx Event type checkbox list (for notifier subscription) │ └── ui/ shadcn primitives — DO NOT hand-edit ├── hooks/ (legacy folder, mostly empty — prefer lib/hooks) ├── i18n/ en/ru dictionaries + useT hook @@ -143,6 +150,7 @@ src/ │ ├── prefs.ts zustand store: theme, locale, density │ ├── queryKeys.ts Centralised React Query key factory (QK) │ ├── utils.ts cn() helper +│ ├── events.ts Event type defs + `eventLabel()` i18n helper │ └── hooks/ │ ├── useArmedConfirm.ts Two-state idle⇄armed machine with timeout │ ├── useDebouncedValue.ts Generic debounce for query inputs diff --git a/backend/cmd/server/main.go b/backend/cmd/server/main.go index f57dd13..9106345 100644 --- a/backend/cmd/server/main.go +++ b/backend/cmd/server/main.go @@ -21,6 +21,7 @@ import ( "github.com/artyomsv/marauder/backend/internal/db" "github.com/artyomsv/marauder/backend/internal/db/repo" "github.com/artyomsv/marauder/backend/internal/domain" + "github.com/artyomsv/marauder/backend/internal/events" "github.com/artyomsv/marauder/backend/internal/logging" "github.com/artyomsv/marauder/backend/internal/notify" "github.com/artyomsv/marauder/backend/internal/scheduler" @@ -140,8 +141,10 @@ func run() error { } // Scheduler + topicEventsRepo := repo.NewTopicEvents(pool) disp := notify.New(notifiersRepo, master, logger) - sch := scheduler.New(cfg, logger, topicsRepo, clientsRepo, credsRepo, deliveriesRepo, master, disp) + bus := events.New(topicEventsRepo, disp, nil, logger) // SSE publisher nil until Phase 3 + sch := scheduler.New(cfg, logger, topicsRepo, clientsRepo, credsRepo, deliveriesRepo, master, bus) go func() { if err := sch.Start(rootCtx); err != nil { logger.Error().Err(err).Msg("scheduler exited with error") @@ -170,12 +173,14 @@ func run() error { Clients: clientsRepo, Notifiers: notifiersRepo, Creds: credsRepo, - Deliveries: deliveriesRepo, - Settings: settingsRepo, + Deliveries: deliveriesRepo, + TopicEvents: topicEventsRepo, + Settings: settingsRepo, Audit: auditRepo, AuditLog: auditLogger, OIDC: oidcProvider, Scheduler: sch, + Emit: bus.Emit, }) srv := &http.Server{ Addr: cfg.HTTPAddr, diff --git a/backend/internal/api/handlers/notifiers.go b/backend/internal/api/handlers/notifiers.go index fd6e18c..d1b8156 100644 --- a/backend/internal/api/handlers/notifiers.go +++ b/backend/internal/api/handlers/notifiers.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "net/http" + "sort" "github.com/go-chi/chi/v5" "github.com/google/uuid" @@ -12,6 +13,7 @@ import ( "github.com/artyomsv/marauder/backend/internal/crypto" "github.com/artyomsv/marauder/backend/internal/db/repo" "github.com/artyomsv/marauder/backend/internal/domain" + "github.com/artyomsv/marauder/backend/internal/events" "github.com/artyomsv/marauder/backend/internal/plugins/registry" "github.com/artyomsv/marauder/backend/internal/problem" ) @@ -54,6 +56,35 @@ func notifierToView(n *domain.Notifier) notifierView { } } +// validNotifierEvents keeps only events a notifier may subscribe to: +// the canonical notifiable set, plus the legacy "updated"/"error" keywords +// (still honored via dispatcher aliases). Empty input defaults to the full +// canonical notifiable set. Unknown/non-notifiable events are dropped. +func validNotifierEvents(in []string) []string { + allowed := map[string]bool{"updated": true, "error": true} + var canonical []string + for _, ty := range events.NotifiableTypes() { + allowed[string(ty)] = true + canonical = append(canonical, string(ty)) + } + sort.Strings(canonical) + if len(in) == 0 { + return canonical + } + out := make([]string, 0, len(in)) + seen := map[string]bool{} + for _, e := range in { + if allowed[e] && !seen[e] { + out = append(out, e) + seen[e] = true + } + } + if len(out) == 0 { + return canonical + } + return out +} + // List handles GET /notifiers. func (h *Notifiers) List(w http.ResponseWriter, r *http.Request) { uid, perr := currentUserID(r) @@ -111,17 +142,14 @@ func (h *Notifiers) Create(w http.ResponseWriter, r *http.Request) { problem.Write(w, r, h.BaseURL, problem.ErrInternal("encrypt: "+err.Error())) return } - events := req.Events - if len(events) == 0 { - events = []string{"updated", "error"} - } + evts := validNotifierEvents(req.Events) created, cerr := h.Notifiers.Create(r.Context(), &domain.Notifier{ UserID: uid, NotifierName: req.NotifierName, DisplayName: req.DisplayName, ConfigEnc: enc, ConfigNonce: nonce, - Events: events, + Events: evts, IsDefault: req.IsDefault, }) if cerr != nil { @@ -211,11 +239,8 @@ func (h *Notifiers) Update(w http.ResponseWriter, r *http.Request) { problem.Write(w, r, h.BaseURL, problem.ErrInternal("encrypt: "+eerr.Error())) return } - events := req.Events - if len(events) == 0 { - events = []string{"updated", "error"} - } - if err := h.Notifiers.Update(r.Context(), id, uid, existing.NotifierName, req.DisplayName, events, req.IsDefault, enc, nonce); err != nil { + evts := validNotifierEvents(req.Events) + if err := h.Notifiers.Update(r.Context(), id, uid, existing.NotifierName, req.DisplayName, evts, req.IsDefault, enc, nonce); err != nil { if errors.Is(err, repo.ErrNotFound) { problem.Write(w, r, h.BaseURL, problem.ErrNotFound("notifier not found")) return diff --git a/backend/internal/api/handlers/notifiers_handler_test.go b/backend/internal/api/handlers/notifiers_handler_test.go index b1e3aa0..d1d631b 100644 --- a/backend/internal/api/handlers/notifiers_handler_test.go +++ b/backend/internal/api/handlers/notifiers_handler_test.go @@ -108,3 +108,21 @@ func TestNotifiers_Update_PassesIsDefault(t *testing.T) { t.Errorf("Update not called with is_default=true (called=%v def=%v)", store.updateCalled, store.updateIsDef) } } + +func TestValidNotifierEvents_FiltersAndDefaults(t *testing.T) { + // empty -> full canonical notifiable set (5 entries) + if got := validNotifierEvents(nil); len(got) != 5 { + t.Errorf("default set size = %d, want 5", len(got)) + } + // drops legacy 'updated' is allowed-through (kept for back-compat) but + // unknown junk is dropped + got := validNotifierEvents([]string{"release.found", "bogus.event", "download.completed"}) + for _, e := range got { + if e == "bogus.event" { + t.Errorf("bogus event should be dropped: %v", got) + } + } + if len(got) != 2 { + t.Errorf("got %v, want [release.found download.completed]", got) + } +} diff --git a/backend/internal/api/handlers/topic_events.go b/backend/internal/api/handlers/topic_events.go new file mode 100644 index 0000000..a3da626 --- /dev/null +++ b/backend/internal/api/handlers/topic_events.go @@ -0,0 +1,76 @@ +package handlers + +import ( + "context" + "net/http" + "strconv" + + "github.com/go-chi/chi/v5" + "github.com/google/uuid" + + "github.com/artyomsv/marauder/backend/internal/domain" + "github.com/artyomsv/marauder/backend/internal/problem" +) + +// topicEventsStore is the consumer seam over *repo.TopicEvents. +type topicEventsStore interface { + ListForTopic(ctx context.Context, topicID, userID uuid.UUID, limit int, beforeID int64) ([]*domain.TopicEvent, error) +} + +// topicOwnerStore verifies topic ownership before returning its history. +type topicOwnerStore interface { + GetByID(ctx context.Context, id uuid.UUID, userID *uuid.UUID) (*domain.Topic, error) +} + +// TopicEvents handles GET /topics/{id}/events. +type TopicEvents struct { + Events topicEventsStore + Topics topicOwnerStore + BaseURL string +} + +type topicEventView struct { + ID int64 `json:"id"` + EventType string `json:"event_type"` + Severity string `json:"severity"` + Message string `json:"message"` + Data map[string]any `json:"data,omitempty"` + CreatedAt string `json:"created_at"` +} + +// List handles GET /topics/{id}/events?limit=&before=. +func (h *TopicEvents) List(w http.ResponseWriter, r *http.Request) { + uid, perr := currentUserID(r) + if perr != nil { + problem.Write(w, r, h.BaseURL, perr) + return + } + id, ierr := uuid.Parse(chi.URLParam(r, "id")) + if ierr != nil { + problem.Write(w, r, h.BaseURL, problem.ErrBadRequest("invalid id")) + return + } + if _, err := h.Topics.GetByID(r.Context(), id, &uid); err != nil { + problem.Write(w, r, h.BaseURL, problem.ErrNotFound("topic not found")) + return + } + limit, _ := strconv.Atoi(r.URL.Query().Get("limit")) + before, _ := strconv.ParseInt(r.URL.Query().Get("before"), 10, 64) + rows, err := h.Events.ListForTopic(r.Context(), id, uid, limit, before) + if err != nil { + problem.Write(w, r, h.BaseURL, problem.ErrInternal(err.Error())) + return + } + out := make([]topicEventView, 0, len(rows)) + for _, e := range rows { + out = append(out, topicEventView{ + ID: e.ID, + EventType: e.EventType, + Severity: e.Severity, + Message: e.Message, + Data: e.Data, + CreatedAt: e.CreatedAt.UTC().Format("2006-01-02T15:04:05Z"), + }) + } + writeJSON(w, http.StatusOK, map[string]any{"events": out}) +} diff --git a/backend/internal/api/handlers/topic_events_test.go b/backend/internal/api/handlers/topic_events_test.go new file mode 100644 index 0000000..181a323 --- /dev/null +++ b/backend/internal/api/handlers/topic_events_test.go @@ -0,0 +1,95 @@ +package handlers + +import ( + "context" + "encoding/json" + "errors" + "net/http/httptest" + "testing" + "time" + + "github.com/google/uuid" + + "github.com/artyomsv/marauder/backend/internal/domain" +) + +// fakeTopicEventsStore is a stub topicEventsStore for handler tests. +type fakeTopicEventsStore struct { + rows []*domain.TopicEvent + err error +} + +func (f *fakeTopicEventsStore) ListForTopic(_ context.Context, _, _ uuid.UUID, _ int, _ int64) ([]*domain.TopicEvent, error) { + return f.rows, f.err +} + +// fakeTopicOwner is a stub topicOwnerStore for handler tests. +type fakeTopicOwner struct { + ok bool +} + +func (f *fakeTopicOwner) GetByID(_ context.Context, _ uuid.UUID, _ *uuid.UUID) (*domain.Topic, error) { + if f.ok { + return &domain.Topic{ID: uuid.New()}, nil + } + return nil, errors.New("not found") +} + +// topicEventsResp mirrors the JSON shape the handler returns. +type topicEventsResp struct { + Events []struct { + ID int64 `json:"id"` + EventType string `json:"event_type"` + Severity string `json:"severity"` + Message string `json:"message"` + Data map[string]any `json:"data,omitempty"` + CreatedAt string `json:"created_at"` + } `json:"events"` +} + +// TestTopicEvents_List_ReturnsTopicHistory verifies that a GET on an owned +// topic returns 200 with events ordered as the store delivered them. +func TestTopicEvents_List_ReturnsTopicHistory(t *testing.T) { + tid := uuid.New() + store := &fakeTopicEventsStore{rows: []*domain.TopicEvent{ + {ID: 2, TopicID: tid, EventType: "release.found", Severity: "info", Message: "New release", CreatedAt: time.Now()}, + {ID: 1, TopicID: tid, EventType: "check.failed", Severity: "error", Message: "boom", CreatedAt: time.Now()}, + }} + h := &TopicEvents{Events: store, Topics: &fakeTopicOwner{ok: true}, BaseURL: ""} + + w := httptest.NewRecorder() + req := withURLParam(authedReq(t, uuid.New(), nil), "id", tid.String()) + h.List(w, req) + + if w.Code != 200 { + t.Fatalf("status %d, want 200; body %s", w.Code, w.Body.String()) + } + var resp topicEventsResp + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("decode: %v", err) + } + if len(resp.Events) != 2 { + t.Fatalf("got %d events, want 2", len(resp.Events)) + } + if resp.Events[0].EventType != "release.found" { + t.Errorf("events[0].event_type = %q, want release.found", resp.Events[0].EventType) + } + if resp.Events[1].EventType != "check.failed" { + t.Errorf("events[1].event_type = %q, want check.failed", resp.Events[1].EventType) + } +} + +// TestTopicEvents_List_NotOwnedTopic verifies that when GetByID errors (topic +// not found or not owned), the handler returns 404. +func TestTopicEvents_List_NotOwnedTopic(t *testing.T) { + store := &fakeTopicEventsStore{} + h := &TopicEvents{Events: store, Topics: &fakeTopicOwner{ok: false}, BaseURL: ""} + + w := httptest.NewRecorder() + req := withURLParam(authedReq(t, uuid.New(), nil), "id", uuid.New().String()) + h.List(w, req) + + if w.Code != 404 { + t.Fatalf("status %d, want 404; body %s", w.Code, w.Body.String()) + } +} diff --git a/backend/internal/api/handlers/topics.go b/backend/internal/api/handlers/topics.go index 7120b6d..ec733db 100644 --- a/backend/internal/api/handlers/topics.go +++ b/backend/internal/api/handlers/topics.go @@ -14,6 +14,7 @@ import ( "github.com/artyomsv/marauder/backend/internal/api/middleware" "github.com/artyomsv/marauder/backend/internal/db/repo" "github.com/artyomsv/marauder/backend/internal/domain" + "github.com/artyomsv/marauder/backend/internal/events" "github.com/artyomsv/marauder/backend/internal/plugins/registry" "github.com/artyomsv/marauder/backend/internal/problem" "github.com/artyomsv/marauder/backend/internal/topics" @@ -67,6 +68,9 @@ type Topics struct { Notifiers notifiersLookup Master configDecryptor BaseURL string + // Emit is an optional hook called after a topic is successfully created. + // Nil-safe: existing handler tests that don't set Emit continue to pass. + Emit func(ctx context.Context, ev events.Event) } type createTopicReq struct { @@ -151,7 +155,20 @@ func (h *Topics) Create(w http.ResponseWriter, r *http.Request) { "A topic for this URL already exists.")) return } - writeJSON(w, http.StatusCreated, res.Topic) + created := res.Topic + if h.Emit != nil { + h.Emit(r.Context(), events.Event{ + UserID: created.UserID, + TopicID: &created.ID, + NotifierID: created.NotifierID, + Type: events.TopicAdded, + Severity: "info", + Title: created.DisplayName, + Body: "Topic added", + Link: h.BaseURL + "/topics", + }) + } + writeJSON(w, http.StatusCreated, created) } // topicCreateProblem maps a topics.BuildAndCreate error to an RFC-7807 diff --git a/backend/internal/api/handlers/topics_handler_test.go b/backend/internal/api/handlers/topics_handler_test.go index 01415b6..e540638 100644 --- a/backend/internal/api/handlers/topics_handler_test.go +++ b/backend/internal/api/handlers/topics_handler_test.go @@ -10,6 +10,7 @@ import ( "github.com/artyomsv/marauder/backend/internal/db/repo" "github.com/artyomsv/marauder/backend/internal/domain" + "github.com/artyomsv/marauder/backend/internal/events" "github.com/artyomsv/marauder/backend/internal/plugins/registry" ) @@ -220,6 +221,30 @@ func TestTopicsUpdate_BadQuality(t *testing.T) { func intPtr(n int) *int { return &n } +// TestTopics_Create_EmitsTopicAdded asserts that a successful POST /topics +// calls h.Emit exactly once with events.TopicAdded. +func TestTopics_Create_EmitsTopicAdded(t *testing.T) { + var got []events.Event + store := &fakeTopicStore{} + h := &Topics{ + Topics: store, + BaseURL: "http://test", + Emit: func(_ context.Context, ev events.Event) { got = append(got, ev) }, + } + + body := createTopicReq{URL: "fake-create://topic/emit-test"} + w := httptest.NewRecorder() + req := authedReq(t, uuid.New(), body) + h.Create(w, req) + + if w.Code != http.StatusCreated { + t.Fatalf("status = %d, want 201; body=%s", w.Code, w.Body.String()) + } + if len(got) != 1 || got[0].Type != events.TopicAdded { + t.Fatalf("want one topic.added event, got %+v", got) + } +} + // --------------------------------------------------------------------------- // Fake ownership-validation lookups // --------------------------------------------------------------------------- diff --git a/backend/internal/api/router.go b/backend/internal/api/router.go index 70b7fa0..7fe0929 100644 --- a/backend/internal/api/router.go +++ b/backend/internal/api/router.go @@ -2,6 +2,7 @@ package api import ( + "context" "net/http" "time" @@ -19,6 +20,7 @@ import ( "github.com/artyomsv/marauder/backend/internal/config" "github.com/artyomsv/marauder/backend/internal/crypto" "github.com/artyomsv/marauder/backend/internal/db/repo" + "github.com/artyomsv/marauder/backend/internal/events" "github.com/artyomsv/marauder/backend/internal/scheduler" ) @@ -34,12 +36,16 @@ type Deps struct { Clients *repo.Clients Notifiers *repo.Notifiers Creds *repo.TrackerCredentials - Deliveries *repo.Deliveries - Settings *repo.Settings + Deliveries *repo.Deliveries + TopicEvents *repo.TopicEvents + Settings *repo.Settings Audit *repo.Audit AuditLog *audit.Logger OIDC *auth.OIDCProvider Scheduler *scheduler.Scheduler + // Emit is the events.Bus.Emit hook wired to the topics handler so it can + // publish topic.added on create. Nil-safe: omitting it disables emission. + Emit func(ctx context.Context, ev events.Event) } // NewRouter builds the HTTP handler tree. @@ -90,6 +96,12 @@ func NewRouter(d Deps) http.Handler { Notifiers: d.Notifiers, Master: d.Master, BaseURL: d.Cfg.PublicBaseURL, + Emit: d.Emit, + } + topicEventsH := &handlers.TopicEvents{ + Events: d.TopicEvents, + Topics: d.Topics, + BaseURL: d.Cfg.PublicBaseURL, } clientsH := &handlers.Clients{ Clients: d.Clients, @@ -142,6 +154,7 @@ func NewRouter(d Deps) http.Handler { r.Put("/topics/{id}", topicsH.Update) r.Delete("/topics/{id}", topicsH.Delete) r.Get("/topics/{id}/status", topicsH.Status) + r.Get("/topics/{id}/events", topicEventsH.List) r.Post("/topics/{id}/pause", topicsH.Pause) r.Post("/topics/{id}/resume", topicsH.Resume) diff --git a/backend/internal/db/repo/topic_events.go b/backend/internal/db/repo/topic_events.go new file mode 100644 index 0000000..a20e099 --- /dev/null +++ b/backend/internal/db/repo/topic_events.go @@ -0,0 +1,109 @@ +package repo + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/artyomsv/marauder/backend/internal/domain" +) + +// topicEventsPool is the minimal pgx surface used by TopicEvents. +type topicEventsPool interface { + QueryRow(ctx context.Context, sql string, args ...any) pgx.Row + Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) + Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error) +} + +// TopicEvents is the repository for topic_events — a topic's history feed. +type TopicEvents struct { + pool topicEventsPool +} + +// NewTopicEvents constructs the repository. +func NewTopicEvents(pool *pgxpool.Pool) *TopicEvents { + return &TopicEvents{pool: pool} +} + +// Record inserts a history row and returns its serial id. Data is marshalled +// to JSON for the jsonb column (nil Data → SQL NULL). +func (r *TopicEvents) Record(ctx context.Context, e *domain.TopicEvent) (int64, error) { + var data []byte + if e.Data != nil { + b, err := json.Marshal(e.Data) + if err != nil { + return 0, fmt.Errorf("topic_events: marshal data: %w", err) + } + data = b + } + const q = ` +INSERT INTO topic_events (topic_id, user_id, event_type, severity, message, data) +VALUES ($1, $2, $3, $4, $5, $6) +RETURNING id` + var id int64 + if err := r.pool.QueryRow(ctx, q, e.TopicID, e.UserID, e.EventType, e.Severity, e.Message, data).Scan(&id); err != nil { + return 0, fmt.Errorf("topic_events: record: %w", err) + } + return id, nil +} + +// ListForTopic returns a topic's events, newest first, capped at limit. +// beforeID==0 returns from the newest; a positive beforeID pages older. +func (r *TopicEvents) ListForTopic(ctx context.Context, topicID, userID uuid.UUID, limit int, beforeID int64) ([]*domain.TopicEvent, error) { + if limit <= 0 || limit > 200 { + limit = 50 + } + const q = ` +SELECT id, topic_id, user_id, event_type, severity, message, data, created_at +FROM topic_events +WHERE topic_id = $1 AND user_id = $2 AND ($3 = 0 OR id < $3) +ORDER BY id DESC +LIMIT $4` + rows, err := r.pool.Query(ctx, q, topicID, userID, beforeID, limit) + if err != nil { + return nil, fmt.Errorf("topic_events: list: %w", err) + } + defer rows.Close() + return scanTopicEvents(rows) +} + +// ListForUserSince returns a user's events with id > sinceID, oldest first. +// Used by the Phase 3 SSE reconnect replay. +func (r *TopicEvents) ListForUserSince(ctx context.Context, userID uuid.UUID, sinceID int64) ([]*domain.TopicEvent, error) { + const q = ` +SELECT id, topic_id, user_id, event_type, severity, message, data, created_at +FROM topic_events +WHERE user_id = $1 AND id > $2 +ORDER BY id ASC +LIMIT 200` + rows, err := r.pool.Query(ctx, q, userID, sinceID) + if err != nil { + return nil, fmt.Errorf("topic_events: list since: %w", err) + } + defer rows.Close() + return scanTopicEvents(rows) +} + +func scanTopicEvents(rows pgx.Rows) ([]*domain.TopicEvent, error) { + var out []*domain.TopicEvent + for rows.Next() { + var e domain.TopicEvent + var data []byte + var createdAt time.Time + if err := rows.Scan(&e.ID, &e.TopicID, &e.UserID, &e.EventType, &e.Severity, &e.Message, &data, &createdAt); err != nil { + return nil, fmt.Errorf("topic_events: scan: %w", err) + } + if len(data) > 0 { + _ = json.Unmarshal(data, &e.Data) + } + e.CreatedAt = createdAt + out = append(out, &e) + } + return out, rows.Err() +} diff --git a/backend/internal/db/repo/topic_events_test.go b/backend/internal/db/repo/topic_events_test.go new file mode 100644 index 0000000..ef1ed8d --- /dev/null +++ b/backend/internal/db/repo/topic_events_test.go @@ -0,0 +1,66 @@ +package repo + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + + "github.com/artyomsv/marauder/backend/internal/domain" +) + +// fakeRow implements pgx.Row for QueryRow-based Record. +type fakeRow struct{ id int64 } + +func (r fakeRow) Scan(dest ...any) error { + *(dest[0].(*int64)) = r.id + return nil +} + +type fakeTEPool struct { + lastSQL string + lastArgs []any + row fakeRow +} + +func (f *fakeTEPool) QueryRow(_ context.Context, sql string, args ...any) pgx.Row { + f.lastSQL = sql + f.lastArgs = args + return f.row +} +func (f *fakeTEPool) Query(_ context.Context, _ string, _ ...any) (pgx.Rows, error) { + return nil, nil +} +func (f *fakeTEPool) Exec(_ context.Context, _ string, _ ...any) (pgconn.CommandTag, error) { + return pgconn.CommandTag{}, nil +} + +func TestTopicEvents_Record_ReturnsID_AndMarshalsData(t *testing.T) { + pool := &fakeTEPool{row: fakeRow{id: 7}} + r := &TopicEvents{pool: pool} + tid, uid := uuid.New(), uuid.New() + id, err := r.Record(context.Background(), &domain.TopicEvent{ + TopicID: tid, UserID: uid, EventType: "release.found", Severity: "info", + Message: "New release", Data: map[string]any{"labels": []string{"s01e01"}}, + CreatedAt: time.Now(), + }) + if err != nil { + t.Fatalf("Record: %v", err) + } + if id != 7 { + t.Errorf("id = %d, want 7", id) + } + // data arg must be JSON bytes, not a raw map (pgx can't encode map->jsonb directly here) + raw, ok := pool.lastArgs[5].([]byte) + if !ok { + t.Fatalf("data arg type %T, want []byte", pool.lastArgs[5]) + } + var back map[string]any + if err := json.Unmarshal(raw, &back); err != nil { + t.Fatalf("data not valid JSON: %v", err) + } +} diff --git a/backend/internal/events/bus.go b/backend/internal/events/bus.go new file mode 100644 index 0000000..a52a878 --- /dev/null +++ b/backend/internal/events/bus.go @@ -0,0 +1,92 @@ +package events + +import ( + "context" + + "github.com/google/uuid" + "github.com/rs/zerolog" + + "github.com/artyomsv/marauder/backend/internal/domain" +) + +// Event is one thing that happened, tagged with its canonical Type. +type Event struct { + UserID uuid.UUID + TopicID *uuid.UUID // nil for non-topic events (e.g. credential session.expired) + NotifierID *uuid.UUID // per-topic notifier override; nil => the user's default notifiers + Type Type + Severity string // info | warn | error (defaults to "info" when empty) + Title string + Body string + Link string + Data map[string]any +} + +// Recorder persists an event to the topic_events history table. +type Recorder interface { + Record(ctx context.Context, e *domain.TopicEvent) (int64, error) +} + +// Notifier fans an event out to a user's notifier plugins. Implemented by +// *notify.Dispatcher. The scheduler already proved this seam shape. +type Notifier interface { + SendVia(ctx context.Context, userID uuid.UUID, notifierID *uuid.UUID, event string, msg domain.Message) int +} + +// Publisher pushes an event onto the live SSE feed. Phase 1 wires nil; the +// Phase 3 hub implements it. id is the persisted history id (0 if ephemeral). +type Publisher interface { + Publish(userID uuid.UUID, ev Event, id int64) +} + +// Bus is the single event->sinks fan-out point. Every sink is optional +// (nil-safe) so the bus is cheap to construct in tests and across phases. +type Bus struct { + rec Recorder + notif Notifier + pub Publisher + log zerolog.Logger +} + +// New constructs a Bus. Any of rec/notif/pub may be nil. +func New(rec Recorder, notif Notifier, pub Publisher, log zerolog.Logger) *Bus { + return &Bus{rec: rec, notif: notif, pub: pub, log: log.With().Str("component", "events").Logger()} +} + +// Emit routes ev to its policy-selected sinks. Best-effort: every sink +// failure is logged and never propagated — emitting an event must never +// break the caller's flow. +func (b *Bus) Emit(ctx context.Context, ev Event) { + if ev.Severity == "" { + ev.Severity = "info" + } + p := PolicyFor(ev.Type) + + var id int64 + if p.Persist && b.rec != nil && ev.TopicID != nil { + rec := &domain.TopicEvent{ + TopicID: *ev.TopicID, + UserID: ev.UserID, + EventType: string(ev.Type), + Severity: ev.Severity, + Message: ev.Title, + Data: ev.Data, + } + got, err := b.rec.Record(ctx, rec) + if err != nil { + b.log.Warn().Err(err).Str("type", string(ev.Type)).Msg("events: record failed") + } else { + id = got + } + } + + if p.Notifiable && b.notif != nil { + b.notif.SendVia(ctx, ev.UserID, ev.NotifierID, string(ev.Type), domain.Message{ + Title: ev.Title, Body: ev.Body, Link: ev.Link, + }) + } + + if p.SSE && b.pub != nil { + b.pub.Publish(ev.UserID, ev, id) + } +} diff --git a/backend/internal/events/bus_test.go b/backend/internal/events/bus_test.go new file mode 100644 index 0000000..9433522 --- /dev/null +++ b/backend/internal/events/bus_test.go @@ -0,0 +1,97 @@ +package events + +import ( + "context" + "testing" + + "github.com/google/uuid" + "github.com/rs/zerolog" + + "github.com/artyomsv/marauder/backend/internal/domain" +) + +type fakeRecorder struct { + got *domain.TopicEvent + retID int64 + called int +} + +func (f *fakeRecorder) Record(_ context.Context, e *domain.TopicEvent) (int64, error) { + f.called++ + f.got = e + return f.retID, nil +} + +type fakeNotifier struct { + event string + notifierID *uuid.UUID + called int +} + +func (f *fakeNotifier) SendVia(_ context.Context, _ uuid.UUID, nid *uuid.UUID, event string, _ domain.Message) int { + f.called++ + f.event = event + f.notifierID = nid + return 1 +} + +type fakePublisher struct { + called int + id int64 +} + +func (f *fakePublisher) Publish(_ uuid.UUID, _ Event, id int64) { f.called++; f.id = id } + +func newBus(t *testing.T) (*Bus, *fakeRecorder, *fakeNotifier, *fakePublisher) { + t.Helper() + rec := &fakeRecorder{retID: 42} + notif := &fakeNotifier{} + pub := &fakePublisher{} + return New(rec, notif, pub, zerolog.Nop()), rec, notif, pub +} + +func TestEmit_ReleaseFound_PersistsNotifiesAndPublishes(t *testing.T) { + bus, rec, notif, pub := newBus(t) + tid := uuid.New() + bus.Emit(context.Background(), Event{ + UserID: uuid.New(), TopicID: &tid, Type: ReleaseFound, + Severity: "info", Title: "X", Body: "Y", + }) + if rec.called != 1 { + t.Errorf("recorder called %d, want 1", rec.called) + } + if rec.got.EventType != string(ReleaseFound) { + t.Errorf("recorded type %s, want release.found", rec.got.EventType) + } + if notif.called != 1 || notif.event != string(ReleaseFound) { + t.Errorf("notifier called %d event %q", notif.called, notif.event) + } + if pub.called != 1 || pub.id != 42 { + t.Errorf("publisher called %d id %d, want 1/42", pub.called, pub.id) + } +} + +func TestEmit_CheckStarted_PublishesOnly(t *testing.T) { + bus, rec, notif, pub := newBus(t) + tid := uuid.New() + bus.Emit(context.Background(), Event{UserID: uuid.New(), TopicID: &tid, Type: CheckStarted}) + if rec.called != 0 { + t.Errorf("recorder called %d, want 0", rec.called) + } + if notif.called != 0 { + t.Errorf("notifier called %d, want 0", notif.called) + } + if pub.called != 1 { + t.Errorf("publisher called %d, want 1", pub.called) + } + if pub.id != 0 { + t.Errorf("ephemeral publish id %d, want 0", pub.id) + } +} + +func TestEmit_NilSeams_NoPanic(t *testing.T) { + bus := New(nil, nil, nil, zerolog.Nop()) + tid := uuid.New() + bus.Emit(context.Background(), Event{UserID: uuid.New(), TopicID: &tid, Type: ReleaseFound}) + // no panic = pass +} diff --git a/backend/internal/events/event.go b/backend/internal/events/event.go new file mode 100644 index 0000000..9a8d0fa --- /dev/null +++ b/backend/internal/events/event.go @@ -0,0 +1,56 @@ +// Package events defines the canonical event taxonomy emitted across the +// backend and the per-type policy that decides, for each event, whether it +// is persisted to history, eligible for notifier fan-out, and pushed over +// the live SSE feed. It is the single source of truth for "what happens to +// an event". +package events + +// Type is a canonical event-type identifier. These strings are a one-way +// door: notifier subscriptions persist them, so they must not be renamed. +type Type string + +const ( + TopicAdded Type = "topic.added" + CheckStarted Type = "check.started" + CheckCompleted Type = "check.completed" + ReleaseFound Type = "release.found" + DownloadSubmitted Type = "download.submitted" + DownloadProgress Type = "download.progress" + DownloadCompleted Type = "download.completed" + CheckFailed Type = "check.failed" + SessionExpired Type = "session.expired" +) + +// Policy describes the routing for one event type. +type Policy struct { + Persist bool // write a topic_events history row + Notifiable bool // eligible for notifier fan-out (subject to subscription) + SSE bool // push over the live feed +} + +var policies = map[Type]Policy{ + TopicAdded: {Persist: true, Notifiable: false, SSE: true}, + CheckStarted: {Persist: false, Notifiable: false, SSE: true}, + CheckCompleted: {Persist: false, Notifiable: false, SSE: true}, + ReleaseFound: {Persist: true, Notifiable: true, SSE: true}, + DownloadSubmitted: {Persist: true, Notifiable: true, SSE: true}, + DownloadProgress: {Persist: false, Notifiable: false, SSE: true}, + DownloadCompleted: {Persist: true, Notifiable: true, SSE: true}, + CheckFailed: {Persist: true, Notifiable: true, SSE: true}, + SessionExpired: {Persist: true, Notifiable: true, SSE: true}, +} + +// PolicyFor returns the routing policy for t. An unknown type is inert +// (no persist, no notify, no SSE) — a defensive default. +func PolicyFor(t Type) Policy { return policies[t] } + +// NotifiableTypes returns the event types a notifier may subscribe to. +func NotifiableTypes() []Type { + var out []Type + for t, p := range policies { + if p.Notifiable { + out = append(out, t) + } + } + return out +} diff --git a/backend/internal/events/event_test.go b/backend/internal/events/event_test.go new file mode 100644 index 0000000..e445e87 --- /dev/null +++ b/backend/internal/events/event_test.go @@ -0,0 +1,49 @@ +package events + +import "testing" + +func TestPolicyFor(t *testing.T) { + tests := []struct { + typ Type + persist, notify, sse bool + }{ + {TopicAdded, true, false, true}, + {CheckStarted, false, false, true}, + {CheckCompleted, false, false, true}, + {ReleaseFound, true, true, true}, + {DownloadSubmitted, true, true, true}, + {DownloadProgress, false, false, true}, + {DownloadCompleted, true, true, true}, + {CheckFailed, true, true, true}, + {SessionExpired, true, true, true}, + } + for _, tt := range tests { + t.Run(string(tt.typ), func(t *testing.T) { + p := PolicyFor(tt.typ) + if p.Persist != tt.persist || p.Notifiable != tt.notify || p.SSE != tt.sse { + t.Errorf("PolicyFor(%s) = %+v, want persist=%v notify=%v sse=%v", + tt.typ, p, tt.persist, tt.notify, tt.sse) + } + }) + } +} + +func TestPolicyFor_Unknown_DefaultsToInert(t *testing.T) { + p := PolicyFor(Type("nope.nope")) + if p.Persist || p.Notifiable || p.SSE { + t.Errorf("unknown type should be inert, got %+v", p) + } +} + +func TestNotifiableTypes(t *testing.T) { + got := NotifiableTypes() + want := map[Type]bool{ReleaseFound: true, DownloadSubmitted: true, DownloadCompleted: true, CheckFailed: true, SessionExpired: true} + if len(got) != len(want) { + t.Fatalf("got %d notifiable types, want %d", len(got), len(want)) + } + for _, ty := range got { + if !want[ty] { + t.Errorf("unexpected notifiable type %s", ty) + } + } +} diff --git a/backend/internal/notify/dispatcher.go b/backend/internal/notify/dispatcher.go index 5610771..01673d0 100644 --- a/backend/internal/notify/dispatcher.go +++ b/backend/internal/notify/dispatcher.go @@ -115,10 +115,20 @@ func (d *Dispatcher) sendOne(ctx context.Context, n *domain.Notifier, event stri return true } +// legacyAliases maps a pre-taxonomy subscription keyword to the canonical +// event types it now covers, so notifiers created before per-event +// subscription (events = ['updated','error']) keep delivering. "updated" is +// intentionally broad — new releases, client submissions, and completions. +var legacyAliases = map[string][]string{ + "updated": {"release.found", "download.submitted", "download.completed"}, + "error": {"check.failed", "session.expired"}, +} + // subscribed reports whether a notifier with the given event subscription // list should receive an event. An empty list (or empty event) means "all // events" — a defensive default so a notifier created before event // filtering, or a caller that doesn't categorise, still delivers. +// A subscription entry matches directly, or via its legacy alias expansion. func subscribed(events []string, event string) bool { if len(events) == 0 || event == "" { return true @@ -127,6 +137,11 @@ func subscribed(events []string, event string) bool { if e == event { return true } + for _, alias := range legacyAliases[e] { + if alias == event { + return true + } + } } return false } diff --git a/backend/internal/notify/dispatcher_test.go b/backend/internal/notify/dispatcher_test.go index f1d1655..6b3e73b 100644 --- a/backend/internal/notify/dispatcher_test.go +++ b/backend/internal/notify/dispatcher_test.go @@ -316,3 +316,39 @@ func TestSendVia_NilID_DefaultRespectsSubscription(t *testing.T) { t.Errorf("want 0 (default not subscribed to updated), got %d", got) } } + +func TestSubscribed_LegacyUpdated_MatchesNewReleaseEvents(t *testing.T) { + legacy := []string{"updated", "error"} + for _, ev := range []string{"release.found", "download.submitted", "download.completed"} { + if !subscribed(legacy, ev) { + t.Errorf("legacy 'updated' should match %s", ev) + } + } +} + +func TestSubscribed_LegacyError_MatchesErrorEvents(t *testing.T) { + legacy := []string{"error"} + for _, ev := range []string{"check.failed", "session.expired"} { + if !subscribed(legacy, ev) { + t.Errorf("legacy 'error' should match %s", ev) + } + } + if subscribed(legacy, "release.found") { + t.Error("legacy 'error' must NOT match release.found") + } +} + +func TestSubscribed_Canonical_DirectMatch(t *testing.T) { + if !subscribed([]string{"download.completed"}, "download.completed") { + t.Error("canonical event should match itself") + } + if subscribed([]string{"download.completed"}, "release.found") { + t.Error("canonical subscription must not match a different event") + } +} + +func TestSubscribed_EmptyMeansAll(t *testing.T) { + if !subscribed(nil, "anything") { + t.Error("empty subscription should match all") + } +} diff --git a/backend/internal/scheduler/scheduler.go b/backend/internal/scheduler/scheduler.go index f7a43ea..4262fcb 100644 --- a/backend/internal/scheduler/scheduler.go +++ b/backend/internal/scheduler/scheduler.go @@ -34,6 +34,7 @@ import ( "github.com/artyomsv/marauder/backend/internal/crypto" "github.com/artyomsv/marauder/backend/internal/db/repo" "github.com/artyomsv/marauder/backend/internal/domain" + "github.com/artyomsv/marauder/backend/internal/events" "github.com/artyomsv/marauder/backend/internal/extra" "github.com/artyomsv/marauder/backend/internal/infohash" "github.com/artyomsv/marauder/backend/internal/metrics" @@ -83,14 +84,11 @@ type credentialsRepo interface { MarkSessionExpired(ctx context.Context, id, userID uuid.UUID) (bool, error) } -// eventNotifier is the subset of *notify.Dispatcher that the scheduler -// uses to fire notifications (a new-release "updated" alert, a one-time -// session-expiry "error" alert). Defined as an interface so the scheduler -// package does not need to import notify (avoiding any potential import -// cycle). The dispatcher filters by each notifier's event subscription. -type eventNotifier interface { - Send(ctx context.Context, userID uuid.UUID, event string, msg domain.Message) int - SendVia(ctx context.Context, userID uuid.UUID, notifierID *uuid.UUID, event string, msg domain.Message) int +// emitter is the subset of *events.Bus the scheduler uses to publish typed +// lifecycle events. Defined as an interface so the scheduler stays +// unit-testable without the bus, and nil-safe in tests that ignore events. +type emitter interface { + Emit(ctx context.Context, ev events.Event) } // decryptor is the subset of *crypto.MasterKey that the scheduler uses. @@ -132,7 +130,7 @@ type Scheduler struct { creds credentialsRepo deliveries deliveriesRecorder // nil-safe; records what was pushed to a client master decryptor - notifier eventNotifier // nil-safe; fires new-release + session-expiry alerts + emit emitter // nil-safe; publishes typed lifecycle events // Test seams (default to registry.GetTracker / registry.GetClient). lookupTracker trackerLookupFn @@ -153,7 +151,7 @@ type Scheduler struct { } // New constructs a scheduler. -func New(cfg *config.Config, log zerolog.Logger, topics *repo.Topics, clients *repo.Clients, creds *repo.TrackerCredentials, deliveries *repo.Deliveries, master *crypto.MasterKey, notifier eventNotifier) *Scheduler { +func New(cfg *config.Config, log zerolog.Logger, topics *repo.Topics, clients *repo.Clients, creds *repo.TrackerCredentials, deliveries *repo.Deliveries, master *crypto.MasterKey, emit emitter) *Scheduler { return &Scheduler{ cfg: cfg, log: log.With().Str("component", "scheduler").Logger(), @@ -162,7 +160,7 @@ func New(cfg *config.Config, log zerolog.Logger, topics *repo.Topics, clients *r creds: creds, deliveries: deliveries, master: master, - notifier: notifier, + emit: emit, lookupTracker: registry.GetTracker, lookupClient: registry.GetClient, jobs: make(chan *domain.Topic, cfg.SchedulerWorkers*4), @@ -341,6 +339,11 @@ func (s *Scheduler) runCheck(ctx context.Context, log zerolog.Logger, t *domain. return } + // Emit check.started once the tracker plugin is confirmed present. + if s.emit != nil { + s.emit.Emit(ctx, events.Event{UserID: t.UserID, TopicID: &t.ID, Type: events.CheckStarted}) + } + // checkCtx covers credential decryption, login, and the initial // Check call. The per-episode Download loop allocates its own // per-iteration context with the same TrackerHTTPTimeout so each @@ -371,6 +374,16 @@ func (s *Scheduler) runCheck(ctx context.Context, log zerolog.Logger, t *domain. log.Info().Str("old_hash", t.LastHash).Str("new_hash", check.Hash).Msg("topic updated") metrics.TrackerUpdatesTotal.WithLabelValues(t.TrackerName).Inc() + // Emit release.found once per new hash, before draining episodes. + if s.emit != nil { + s.emit.Emit(ctx, events.Event{ + UserID: t.UserID, TopicID: &t.ID, NotifierID: t.NotifierID, + Type: events.ReleaseFound, Severity: "info", + Title: t.DisplayName, Body: "New release detected", + Link: s.cfg.PublicBaseURL + "/topics", + }) + } + var dlErr error delivered, dlErr = s.downloadAllPending(ctx, log, t, tr, check, creds) anySubmitted = len(delivered) > 0 @@ -416,15 +429,22 @@ func (s *Scheduler) runCheck(ctx context.Context, log zerolog.Logger, t *domain. } metrics.SchedulerTopicChecksTotal.WithLabelValues(t.TrackerName, "ok").Inc() - s.recordResult(ctx, log, t.ID, check.Hash, updated || anySubmitted, s.backoff(t, false, nil), "") + nextCheckAt := s.backoff(t, false, nil) + if s.emit != nil { + s.emit.Emit(ctx, events.Event{ + UserID: t.UserID, TopicID: &t.ID, Type: events.CheckCompleted, + Data: map[string]any{"next_check_at": nextCheckAt.UTC().Format(time.RFC3339)}, + }) + } + s.recordResult(ctx, log, t.ID, check.Hash, updated || anySubmitted, nextCheckAt, "") s.recordChecked(updated || anySubmitted, false) } -// notifyUpdated fires a single "new release" notification summarising what -// was delivered this tick. Best-effort: a nil notifier or zero deliveries -// is a no-op, and the dispatcher itself swallows per-notifier failures. +// notifyUpdated emits a download.submitted event summarising what was +// delivered this tick. Best-effort: a nil emitter or zero deliveries is a +// no-op. func (s *Scheduler) notifyUpdated(ctx context.Context, t *domain.Topic, labels []string) { - if s.notifier == nil || len(labels) == 0 { + if s.emit == nil || len(labels) == 0 { return } const maxList = 10 @@ -438,28 +458,27 @@ func (s *Scheduler) notifyUpdated(ctx context.Context, t *domain.Topic, labels [ if overflow > 0 { body += fmt.Sprintf(" (+%d more)", overflow) } - s.notifier.SendVia(ctx, t.UserID, t.NotifierID, "updated", domain.Message{ - Title: t.DisplayName, - Body: body, - Link: s.cfg.PublicBaseURL + "/topics", + s.emit.Emit(ctx, events.Event{ + UserID: t.UserID, TopicID: &t.ID, NotifierID: t.NotifierID, + Type: events.DownloadSubmitted, Severity: "info", + Title: t.DisplayName, Body: body, Link: s.cfg.PublicBaseURL + "/topics", }) } -// notifyError fires a one-shot "error" notification when a topic first enters -// the error state (tracker check, download/client, or missing-plugin failure). -// Deduped by the pre-check ConsecutiveErrors snapshot: only the first failure -// (count 0) notifies, so a topic retrying on its backoff schedule doesn't spam -// every tick. Routed via the topic's notifier override or the user's default -// notifiers — same path as new-release alerts — and best-effort (nil notifier -// is a no-op; the dispatcher honours each notifier's "error" subscription). +// notifyError emits a check.failed event when a topic first enters the error +// state (tracker check, download/client, or missing-plugin failure). Deduped +// by the pre-check ConsecutiveErrors snapshot: only the first failure (count +// 0) emits, so a topic retrying on its backoff schedule doesn't spam every +// tick. func (s *Scheduler) notifyError(ctx context.Context, t *domain.Topic, errMsg string) { - if s.notifier == nil || t.ConsecutiveErrors > 0 { + if s.emit == nil || t.ConsecutiveErrors > 0 { return } - s.notifier.SendVia(ctx, t.UserID, t.NotifierID, "error", domain.Message{ - Title: "Topic check failed: " + t.DisplayName, - Body: errMsg, - Link: s.cfg.PublicBaseURL + "/topics", + s.emit.Emit(ctx, events.Event{ + UserID: t.UserID, TopicID: &t.ID, NotifierID: t.NotifierID, + Type: events.CheckFailed, Severity: "error", + Title: "Topic check failed: " + t.DisplayName, Body: errMsg, + Link: s.cfg.PublicBaseURL + "/topics", }) } @@ -507,8 +526,10 @@ func (s *Scheduler) loadCredentials(ctx context.Context, checkCtx context.Contex if merr != nil { log.Warn().Err(merr).Msg("mark session expired failed") } - if transitioned && s.notifier != nil { - s.notifier.Send(ctx, stored.UserID, "error", domain.Message{ + if transitioned && s.emit != nil { + s.emit.Emit(ctx, events.Event{ + UserID: stored.UserID, TopicID: &t.ID, NotifierID: t.NotifierID, + Type: events.SessionExpired, Severity: "error", Title: "Tracker session expired", Body: t.TrackerName + " needs re-authentication — solve the captcha in Marauder.", Link: s.cfg.PublicBaseURL + "/credentials", diff --git a/backend/internal/scheduler/scheduler_test.go b/backend/internal/scheduler/scheduler_test.go index 8a8d3c9..8156b93 100644 --- a/backend/internal/scheduler/scheduler_test.go +++ b/backend/internal/scheduler/scheduler_test.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "strings" + "sync" "testing" "time" @@ -14,6 +15,7 @@ import ( "github.com/artyomsv/marauder/backend/internal/config" "github.com/artyomsv/marauder/backend/internal/domain" + "github.com/artyomsv/marauder/backend/internal/events" "github.com/artyomsv/marauder/backend/internal/plugins/registry" ) @@ -175,31 +177,44 @@ func (f *fakeCredsSession) MarkSessionExpired(_ context.Context, _, _ uuid.UUID) return f.markExpiredWon, f.markExpiredErr } -// fakeNotifier records Send / SendVia calls. -type fakeNotifier struct { - calls int - lastID uuid.UUID - lastEvent string - lastMsg domain.Message - lastNotifierID *uuid.UUID +// fakeEmitter records every Emit call for assertions. +type fakeEmitter struct { + mu sync.Mutex + events []events.Event } -func (f *fakeNotifier) Send(_ context.Context, userID uuid.UUID, event string, msg domain.Message) int { - f.calls++ - f.lastID = userID - f.lastEvent = event - f.lastMsg = msg - f.lastNotifierID = nil - return 1 +func (f *fakeEmitter) Emit(_ context.Context, ev events.Event) { + f.mu.Lock() + defer f.mu.Unlock() + f.events = append(f.events, ev) } -func (f *fakeNotifier) SendVia(_ context.Context, userID uuid.UUID, notifierID *uuid.UUID, event string, msg domain.Message) int { - f.calls++ - f.lastID = userID - f.lastEvent = event - f.lastMsg = msg - f.lastNotifierID = notifierID - return 1 +func (f *fakeEmitter) types() []events.Type { + f.mu.Lock() + defer f.mu.Unlock() + var out []events.Type + for _, e := range f.events { + out = append(out, e.Type) + } + return out +} + +func (f *fakeEmitter) ofType(tp events.Type) []events.Event { + f.mu.Lock() + defer f.mu.Unlock() + var out []events.Event + for _, e := range f.events { + if e.Type == tp { + out = append(out, e) + } + } + return out +} + +func (f *fakeEmitter) count() int { + f.mu.Lock() + defer f.mu.Unlock() + return len(f.events) } // fakeDecryptor returns its input unchanged. @@ -246,7 +261,7 @@ type fixture struct { atomicTopics *fakeTopicsAtomic clientPlugin *fakeClientPlugin deliveries *fakeDeliveries - notifier *fakeNotifier + emitter *fakeEmitter tracker *fakeTracker topic *domain.Topic } @@ -285,7 +300,7 @@ func newFixture(t *testing.T, tracker *fakeTracker, atomic bool) *fixture { } deliveries := &fakeDeliveries{} - notifier := &fakeNotifier{} + emit := &fakeEmitter{} s := &Scheduler{ cfg: cfg, log: zerolog.New(io.Discard), @@ -293,7 +308,7 @@ func newFixture(t *testing.T, tracker *fakeTracker, atomic bool) *fixture { clients: &fakeClients{client: client}, creds: &fakeCreds{}, deliveries: deliveries, - notifier: notifier, + emit: emit, master: &fakeDecryptor{}, lookupTracker: func(name string) registry.Tracker { return tracker }, lookupClient: func(name string) registry.Client { return clientPlugin }, @@ -322,7 +337,7 @@ func newFixture(t *testing.T, tracker *fakeTracker, atomic bool) *fixture { atomicTopics: atomicImpl, clientPlugin: clientPlugin, deliveries: deliveries, - notifier: notifier, + emitter: emit, tracker: tracker, topic: topic, } @@ -449,26 +464,25 @@ func TestRunCheck_SinglePayload_NotifiesUpdated(t *testing.T) { f.s.runCheck(context.Background(), f.s.log, f.topic) - if f.notifier.calls != 1 { - t.Fatalf("expected 1 notification, got %d", f.notifier.calls) + evs := f.emitter.ofType(events.DownloadSubmitted) + if len(evs) != 1 { + t.Fatalf("expected 1 download.submitted event, got %d", len(evs)) } - if f.notifier.lastEvent != "updated" { - t.Errorf("event = %q, want updated", f.notifier.lastEvent) - } - if f.notifier.lastID != f.topic.UserID { - t.Errorf("notification sent to wrong user") + ev := evs[0] + if ev.UserID != f.topic.UserID { + t.Errorf("event UserID mismatch") } // Single-payload topics summarise with the topic display name. - if !strings.Contains(f.notifier.lastMsg.Body, "Fake Topic") { - t.Errorf("body = %q, want it to mention the topic name", f.notifier.lastMsg.Body) + if !strings.Contains(ev.Body, "Fake Topic") { + t.Errorf("body = %q, want it to mention the topic name", ev.Body) } // The notification fires when the torrent is handed to the client (download // START), not when it finishes. The body must not claim completion. - if strings.Contains(f.notifier.lastMsg.Body, "Downloaded") { - t.Errorf("body = %q, must not claim the torrent finished downloading", f.notifier.lastMsg.Body) + if strings.Contains(ev.Body, "Downloaded") { + t.Errorf("body = %q, must not claim the torrent finished downloading", ev.Body) } - if !strings.Contains(f.notifier.lastMsg.Body, "Sent to client") { - t.Errorf("body = %q, want it to say the release was sent to the client", f.notifier.lastMsg.Body) + if !strings.Contains(ev.Body, "Sent to client") { + t.Errorf("body = %q, want it to say the release was sent to the client", ev.Body) } } @@ -481,8 +495,8 @@ func TestRunCheck_NoDownload_DoesNotNotify(t *testing.T) { f.s.runCheck(context.Background(), f.s.log, f.topic) - if f.notifier.calls != 0 { - t.Errorf("expected no notification when nothing downloaded, got %d", f.notifier.calls) + if evs := f.emitter.ofType(events.DownloadSubmitted); len(evs) != 0 { + t.Errorf("expected no download.submitted event when nothing downloaded, got %d", len(evs)) } } @@ -495,20 +509,19 @@ func TestRunCheck_CheckError_NotifiesError(t *testing.T) { f.s.runCheck(context.Background(), f.s.log, f.topic) - if f.notifier.calls != 1 { - t.Fatalf("expected 1 error notification on first failure, got %d", f.notifier.calls) - } - if f.notifier.lastEvent != "error" { - t.Errorf("event = %q, want error", f.notifier.lastEvent) + evs := f.emitter.ofType(events.CheckFailed) + if len(evs) != 1 { + t.Fatalf("expected 1 check.failed event on first failure, got %d", len(evs)) } - if f.notifier.lastID != f.topic.UserID { - t.Errorf("error notification sent to wrong user") + ev := evs[0] + if ev.UserID != f.topic.UserID { + t.Errorf("check.failed event UserID mismatch") } - if f.notifier.lastNotifierID != f.topic.NotifierID { - t.Errorf("error notification did not route via the topic's notifier") + if ev.NotifierID != f.topic.NotifierID { + t.Errorf("check.failed event did not carry the topic's notifier override") } - if !strings.Contains(f.notifier.lastMsg.Body, "tracker boom") { - t.Errorf("body = %q, want it to include the underlying error", f.notifier.lastMsg.Body) + if !strings.Contains(ev.Body, "tracker boom") { + t.Errorf("body = %q, want it to include the underlying error", ev.Body) } } @@ -522,8 +535,8 @@ func TestRunCheck_CheckError_AlreadyErrored_NoNotify(t *testing.T) { f.s.runCheck(context.Background(), f.s.log, f.topic) - if f.notifier.calls != 0 { - t.Errorf("expected no error notification on a repeat failure, got %d", f.notifier.calls) + if evs := f.emitter.ofType(events.CheckFailed); len(evs) != 0 { + t.Errorf("expected no check.failed event on a repeat failure, got %d", len(evs)) } } @@ -553,10 +566,11 @@ func TestRunCheck_Episodes_NotifiesWithEpisodeLabels(t *testing.T) { f.s.runCheck(context.Background(), f.s.log, f.topic) - if f.notifier.calls != 1 { - t.Fatalf("expected 1 summary notification, got %d", f.notifier.calls) + evs := f.emitter.ofType(events.DownloadSubmitted) + if len(evs) != 1 { + t.Fatalf("expected 1 download.submitted event, got %d", len(evs)) } - body := f.notifier.lastMsg.Body + body := evs[0].Body if !strings.Contains(body, "s01e01") || !strings.Contains(body, "s01e02") { t.Errorf("body = %q, want both episode labels", body) } @@ -1006,8 +1020,8 @@ func (f *fakeTrackerWithCreds) Verify(_ context.Context, _ *domain.TrackerCreden } // newSessionFixture builds a scheduler wired for session-expiry tests. -// creds is injected as the credentials repo; notifier is the fake notifier. -func newSessionFixture(t *testing.T, creds credentialsRepo, notifier eventNotifier) (*Scheduler, *domain.Topic) { +// creds is injected as the credentials repo; emit is the fake emitter. +func newSessionFixture(t *testing.T, creds credentialsRepo, emit emitter) (*Scheduler, *domain.Topic) { t.Helper() cfg := &config.Config{ SchedulerEnabled: true, @@ -1028,7 +1042,7 @@ func newSessionFixture(t *testing.T, creds credentialsRepo, notifier eventNotifi }}, creds: creds, master: &fakeDecryptor{}, - notifier: notifier, + emit: emit, lookupTracker: func(_ string) registry.Tracker { return nil }, lookupClient: func(_ string) registry.Client { return nil }, jobs: make(chan *domain.Topic, 1), @@ -1067,9 +1081,9 @@ func TestLoadCredentials_SessionExpired_WonTransition(t *testing.T) { SecretEnc: []byte("secret"), } creds := &fakeCredsSession{stored: storedCred, markExpiredWon: true} - notifier := &fakeNotifier{} + emit := &fakeEmitter{} - s, topic := newSessionFixture(t, creds, notifier) + s, topic := newSessionFixture(t, creds, emit) topic.UserID = storedCred.UserID s.lookupTracker = func(_ string) registry.Tracker { return tr } @@ -1086,11 +1100,11 @@ func TestLoadCredentials_SessionExpired_WonTransition(t *testing.T) { if got := creds.markExpiredCalls; got != 1 { t.Errorf("MarkSessionExpired: want 1 call, got %d", got) } - if got := notifier.calls; got != 1 { - t.Errorf("notifier.Send: want 1 call, got %d", got) - } - if notifier.lastID != storedCred.UserID { - t.Errorf("notifier.Send userID: want %s, got %s", storedCred.UserID, notifier.lastID) + evs := emit.ofType(events.SessionExpired) + if len(evs) != 1 { + t.Errorf("session.expired emit: want 1, got %d", len(evs)) + } else if evs[0].UserID != storedCred.UserID { + t.Errorf("session.expired UserID: want %s, got %s", storedCred.UserID, evs[0].UserID) } } @@ -1115,9 +1129,9 @@ func TestLoadCredentials_SessionExpired_LostRace(t *testing.T) { SecretEnc: []byte("secret"), } creds := &fakeCredsSession{stored: storedCred, markExpiredWon: false} - notifier := &fakeNotifier{} + emit := &fakeEmitter{} - s, topic := newSessionFixture(t, creds, notifier) + s, topic := newSessionFixture(t, creds, emit) topic.UserID = storedCred.UserID s.lookupTracker = func(_ string) registry.Tracker { return tr } @@ -1134,8 +1148,8 @@ func TestLoadCredentials_SessionExpired_LostRace(t *testing.T) { if got := creds.markExpiredCalls; got != 1 { t.Errorf("MarkSessionExpired: want 1 attempt (the UPDATE is the gate), got %d", got) } - if got := notifier.calls; got != 0 { - t.Errorf("notifier.Send: want 0 calls (lost the transition race), got %d", got) + if evs := emit.ofType(events.SessionExpired); len(evs) != 0 { + t.Errorf("session.expired emit: want 0 (lost the transition race), got %d", len(evs)) } } @@ -1158,9 +1172,9 @@ func TestLoadCredentials_SessionExpired_AlreadyFlagged(t *testing.T) { SecretEnc: []byte("secret"), } creds := &fakeCredsSession{stored: storedCred} - notifier := &fakeNotifier{} + emit := &fakeEmitter{} - s, topic := newSessionFixture(t, creds, notifier) + s, topic := newSessionFixture(t, creds, emit) topic.UserID = storedCred.UserID s.lookupTracker = func(_ string) registry.Tracker { return tr } @@ -1177,48 +1191,92 @@ func TestLoadCredentials_SessionExpired_AlreadyFlagged(t *testing.T) { if got := creds.markExpiredCalls; got != 0 { t.Errorf("MarkSessionExpired: want 0 calls (already flagged), got %d", got) } - if got := notifier.calls; got != 0 { - t.Errorf("notifier.Send: want 0 calls (already flagged), got %d", got) + if evs := emit.ofType(events.SessionExpired); len(evs) != 0 { + t.Errorf("session.expired emit: want 0 (already flagged), got %d", len(evs)) } } -// TestNotifyUpdated_RoutesToTopicNotifier verifies notifyUpdated forwards -// the topic's NotifierID to the dispatcher so a per-topic override is honoured. +// TestNotifyUpdated_RoutesToTopicNotifier verifies notifyUpdated carries +// the topic's NotifierID in the emitted event so the bus can route via it. func TestNotifyUpdated_RoutesToTopicNotifier(t *testing.T) { - notifier := &fakeNotifier{} - s := &Scheduler{cfg: &config.Config{PublicBaseURL: "http://x"}, notifier: notifier} + emit := &fakeEmitter{} + topicID := uuid.New() + s := &Scheduler{cfg: &config.Config{PublicBaseURL: "http://x"}, emit: emit} notifierID := uuid.New() - topic := &domain.Topic{UserID: uuid.New(), DisplayName: "My Show", NotifierID: ¬ifierID} + topic := &domain.Topic{ID: topicID, UserID: uuid.New(), DisplayName: "My Show", NotifierID: ¬ifierID} s.notifyUpdated(context.Background(), topic, []string{"s01e01"}) - if notifier.calls != 1 { - t.Fatalf("want 1 notification, got %d", notifier.calls) - } - if notifier.lastEvent != "updated" { - t.Errorf("event = %q, want updated", notifier.lastEvent) + evs := emit.ofType(events.DownloadSubmitted) + if len(evs) != 1 { + t.Fatalf("want 1 download.submitted event, got %d", len(evs)) } - if notifier.lastNotifierID == nil || *notifier.lastNotifierID != notifierID { - t.Errorf("notifierID = %v, want %s", notifier.lastNotifierID, notifierID) + ev := evs[0] + if ev.NotifierID == nil || *ev.NotifierID != notifierID { + t.Errorf("NotifierID = %v, want %s", ev.NotifierID, notifierID) } } // TestNotifyUpdated_NilNotifierID_GlobalFanOut verifies a topic without an -// override passes nil through (global fan-out, unchanged behaviour). +// override emits nil NotifierID (global fan-out, unchanged behaviour). func TestNotifyUpdated_NilNotifierID_GlobalFanOut(t *testing.T) { - notifier := &fakeNotifier{} - s := &Scheduler{cfg: &config.Config{PublicBaseURL: "http://x"}, notifier: notifier} + emit := &fakeEmitter{} + topicID := uuid.New() + s := &Scheduler{cfg: &config.Config{PublicBaseURL: "http://x"}, emit: emit} - topic := &domain.Topic{UserID: uuid.New(), DisplayName: "My Show", NotifierID: nil} + topic := &domain.Topic{ID: topicID, UserID: uuid.New(), DisplayName: "My Show", NotifierID: nil} s.notifyUpdated(context.Background(), topic, []string{"s01e01"}) - if notifier.calls != 1 { - t.Fatalf("want 1 notification, got %d", notifier.calls) + evs := emit.ofType(events.DownloadSubmitted) + if len(evs) != 1 { + t.Fatalf("want 1 download.submitted event, got %d", len(evs)) + } + if evs[0].NotifierID != nil { + t.Errorf("NotifierID = %v, want nil", evs[0].NotifierID) + } +} + +// TestRunCheck_NewRelease_EmitsReleaseFoundAndSubmitted drives one check +// that detects a new hash and submits one payload, asserting the typed +// events fire in the expected order with no check.failed emitted. +func TestRunCheck_NewRelease_EmitsReleaseFoundAndSubmitted(t *testing.T) { + const hash = "c12fe1c06bba254a9dc9f519b335aa7c1367a88a" + tr := &fakeTracker{ + name: "faketracker", + checks: []checkResult{ + {check: &domain.Check{Hash: "new-hash"}, err: nil}, + }, + downloads: []downloadResult{ + {payload: &domain.Payload{MagnetURI: "magnet:?xt=urn:btih:" + hash}, err: nil}, + {err: registry.ErrNoPendingEpisodes}, + }, + } + f := newFixture(t, tr, false) + + f.s.runCheck(context.Background(), f.s.log, f.topic) + + typs := f.emitter.types() + + // Must contain release.found and download.submitted. + found := func(tp events.Type) bool { + for _, ty := range typs { + if ty == tp { + return true + } + } + return false + } + if !found(events.ReleaseFound) { + t.Errorf("expected release.found in emitted events, got %v", typs) + } + if !found(events.DownloadSubmitted) { + t.Errorf("expected download.submitted in emitted events, got %v", typs) } - if notifier.lastNotifierID != nil { - t.Errorf("notifierID = %v, want nil", notifier.lastNotifierID) + // Must NOT emit check.failed on success. + if found(events.CheckFailed) { + t.Errorf("unexpected check.failed in emitted events, got %v", typs) } } diff --git a/docs/superpowers/plans/2026-06-25-phase1-event-spine.md b/docs/superpowers/plans/2026-06-25-phase1-event-spine.md new file mode 100644 index 0000000..a879835 --- /dev/null +++ b/docs/superpowers/plans/2026-06-25-phase1-event-spine.md @@ -0,0 +1,1705 @@ +# Phase 1 — Event Spine + History Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Introduce a typed event taxonomy emitted from one place (`events.Bus`) that tees to the `topic_events` history table and the existing notifier dispatcher, expose per-event notifier subscriptions in the UI, and add a read-only per-topic event timeline. + +**Architecture:** A new `internal/events` package owns the canonical `Type`, the `Event` struct, a per-type **policy table** (persist / notifiable / sse), and a `Bus.Emit` that fans each event to its sinks. The scheduler's existing direct `notifier.Send/SendVia("updated"|"error", …)` calls are replaced by `bus.Emit(typed Event)`. The dispatcher's `subscribed()` gains a legacy alias map so existing `['updated','error']` notifier rows keep working. The SSE sink is a no-op seam in Phase 1 (wired to the hub in Phase 3). + +**Tech Stack:** Go 1.25 (chi, pgx v5, zerolog, uuid), React 19.2 + Vite + Tailwind 4 + shadcn + React Query, Vitest. + +## Global Constraints + +- **No new DB migration.** The `topic_events` table already exists (`db/migrations/0001_initial_schema.sql:106`): `{id BIGSERIAL, topic_id UUID, user_id UUID, event_type TEXT, severity TEXT CHECK(info|warn|error), message TEXT, data JSONB, created_at TIMESTAMPTZ}`. +- **Canonical event type strings (one-way door — do not rename):** `topic.added`, `check.started`, `check.completed`, `release.found`, `download.submitted`, `download.progress`, `download.completed`, `check.failed`, `session.expired`. +- **Notifier-subscribable subset (the only 4 the UI offers):** `release.found`, `download.submitted`, `download.completed`, and the error pair `check.failed`/`session.expired`. `check.started`, `download.progress`, `topic.added`, `check.completed` are never notifier-subscribable. +- **Phase 1 does NOT emit `download.progress` or `download.completed`** (those come from the Phase 2 watcher) and does NOT build the SSE hub (Phase 3). The policy table and SSE seam are defined now; the SSE sink is nil. +- **Backward compat (broad alias):** legacy `"updated"` ⇒ {`release.found`, `download.submitted`, `download.completed`}; legacy `"error"` ⇒ {`check.failed`, `session.expired`}. +- **Best-effort/fail-open everywhere:** an emit failure (persist or notify) is logged and never blocks a check, matching the existing `recordDelivery` ethos. +- **Go:** tabs, `gofmt`, wrap errors `fmt.Errorf("…: %w", err)`, table-driven tests `package ` for white-box, `t.Helper()` in fakes. Run `docker run --rm -v "E:/Projects/Stukans/Marauder/backend:/backend" -w //backend golang:1.25 sh -c "go build ./... && go vet ./... && go test -race ./..."`. +- **Frontend:** `interface` for object shapes, `@/` alias, React Query keys from `QK` only, `useT()` for copy, `lucide-react` icons, max 250 lines/file. Run `docker run --rm -v "E:/Projects/Stukans/Marauder/frontend:/frontend" -w //frontend node:20-alpine sh -c "npm run typecheck && npm test && npm run build"`. +- **No AI-attribution in commits.** Imperative subject ≤72 chars. Reference `#93`. + +--- + +## File Structure + +**Backend (new):** +- `backend/internal/events/event.go` — `Type` consts, `Event` struct, `Policy`, `policyFor(Type)`. +- `backend/internal/events/event_test.go` — policy table assertions. +- `backend/internal/events/bus.go` — `Bus`, consumer-side seams (`notifier`, `recorder`, `publisher`), `New`, `Emit`. +- `backend/internal/events/bus_test.go` — fan-out routing with fakes. +- `backend/internal/db/repo/topic_events.go` — `TopicEvents` repo (`Record`, `ListForTopic`, `ListForUserSince`). +- `backend/internal/db/repo/topic_events_test.go` — repo SQL/scan via a fake pool. +- `backend/internal/api/handlers/topic_events.go` — `GET /topics/{id}/events` handler. +- `backend/internal/api/handlers/topic_events_test.go`. + +**Backend (modified):** +- `backend/internal/notify/dispatcher.go` — alias map in `subscribed()`. +- `backend/internal/notify/dispatcher_test.go` — alias cases (create if absent). +- `backend/internal/scheduler/scheduler.go` — swap `eventNotifier` for an `emitter` seam; emit typed events. +- `backend/internal/scheduler/scheduler_test.go` — assert emitted events. +- `backend/internal/api/handlers/topics.go` — emit `topic.added` in `Create`. +- `backend/internal/api/handlers/notifiers.go` — canonical default events + validation. +- `backend/internal/api/router.go` — register `GET /topics/{id}/events`. +- `backend/cmd/server/main.go` — construct `events.Bus`, wire into scheduler + topics handler. + +**Frontend (new):** +- `frontend/src/lib/events.ts` — canonical event constants + `EVENT_LABELS` map. +- `frontend/src/components/topics/TopicEventsTimeline.tsx` — read-only timeline. +- `frontend/src/components/topics/TopicEventsTimeline.test.tsx`. +- `frontend/src/components/notifiers/EventPicker.tsx` — shared checkbox group (Add + Edit). +- `frontend/src/components/notifiers/EventPicker.test.tsx`. + +**Frontend (modified):** +- `frontend/src/lib/api.ts` — `topicEvents(id)` method + `TopicEvent` interface. +- `frontend/src/lib/queryKeys.ts` — `topicEvents(id)` key. +- `frontend/src/pages/Notifiers.tsx` — use `EventPicker` in `AddNotifierCard`; update `eventLabel`. +- `frontend/src/components/notifiers/EditNotifierCard.tsx` — use `EventPicker`. +- `frontend/src/i18n/en.ts` + `frontend/src/i18n/ru.ts` — event labels + timeline copy. + +--- + +## Task 1: events package — Type constants + policy table + +**Files:** +- Create: `backend/internal/events/event.go` +- Test: `backend/internal/events/event_test.go` + +**Interfaces:** +- Produces: `events.Type` (string), the nine `Type` consts, `events.Policy{Persist, Notifiable, SSE bool}`, `events.PolicyFor(t Type) Policy`, `events.NotifiableTypes() []Type`. + +- [ ] **Step 1: Write the failing test** + +```go +package events + +import "testing" + +func TestPolicyFor(t *testing.T) { + tests := []struct { + typ Type + persist, notify, sse bool + }{ + {TopicAdded, true, false, true}, + {CheckStarted, false, false, true}, + {CheckCompleted, false, false, true}, + {ReleaseFound, true, true, true}, + {DownloadSubmitted, true, true, true}, + {DownloadProgress, false, false, true}, + {DownloadCompleted, true, true, true}, + {CheckFailed, true, true, true}, + {SessionExpired, true, true, true}, + } + for _, tt := range tests { + t.Run(string(tt.typ), func(t *testing.T) { + p := PolicyFor(tt.typ) + if p.Persist != tt.persist || p.Notifiable != tt.notify || p.SSE != tt.sse { + t.Errorf("PolicyFor(%s) = %+v, want persist=%v notify=%v sse=%v", + tt.typ, p, tt.persist, tt.notify, tt.sse) + } + }) + } +} + +func TestPolicyFor_Unknown_DefaultsToInert(t *testing.T) { + p := PolicyFor(Type("nope.nope")) + if p.Persist || p.Notifiable || p.SSE { + t.Errorf("unknown type should be inert, got %+v", p) + } +} + +func TestNotifiableTypes(t *testing.T) { + got := NotifiableTypes() + want := map[Type]bool{ReleaseFound: true, DownloadSubmitted: true, DownloadCompleted: true, CheckFailed: true, SessionExpired: true} + if len(got) != len(want) { + t.Fatalf("got %d notifiable types, want %d", len(got), len(want)) + } + for _, ty := range got { + if !want[ty] { + t.Errorf("unexpected notifiable type %s", ty) + } + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `docker run --rm -v "E:/Projects/Stukans/Marauder/backend:/backend" -w //backend golang:1.25 sh -c "go test ./internal/events/..."` +Expected: FAIL — package/identifiers undefined. + +- [ ] **Step 3: Write minimal implementation** + +```go +// Package events defines the canonical event taxonomy emitted across the +// backend and the per-type policy that decides, for each event, whether it +// is persisted to history, eligible for notifier fan-out, and pushed over +// the live SSE feed. It is the single source of truth for "what happens to +// an event". +package events + +// Type is a canonical event-type identifier. These strings are a one-way +// door: notifier subscriptions persist them, so they must not be renamed. +type Type string + +const ( + TopicAdded Type = "topic.added" + CheckStarted Type = "check.started" + CheckCompleted Type = "check.completed" + ReleaseFound Type = "release.found" + DownloadSubmitted Type = "download.submitted" + DownloadProgress Type = "download.progress" + DownloadCompleted Type = "download.completed" + CheckFailed Type = "check.failed" + SessionExpired Type = "session.expired" +) + +// Policy describes the routing for one event type. +type Policy struct { + Persist bool // write a topic_events history row + Notifiable bool // eligible for notifier fan-out (subject to subscription) + SSE bool // push over the live feed +} + +var policies = map[Type]Policy{ + TopicAdded: {Persist: true, Notifiable: false, SSE: true}, + CheckStarted: {Persist: false, Notifiable: false, SSE: true}, + CheckCompleted: {Persist: false, Notifiable: false, SSE: true}, + ReleaseFound: {Persist: true, Notifiable: true, SSE: true}, + DownloadSubmitted: {Persist: true, Notifiable: true, SSE: true}, + DownloadProgress: {Persist: false, Notifiable: false, SSE: true}, + DownloadCompleted: {Persist: true, Notifiable: true, SSE: true}, + CheckFailed: {Persist: true, Notifiable: true, SSE: true}, + SessionExpired: {Persist: true, Notifiable: true, SSE: true}, +} + +// PolicyFor returns the routing policy for t. An unknown type is inert +// (no persist, no notify, no SSE) — a defensive default. +func PolicyFor(t Type) Policy { return policies[t] } + +// NotifiableTypes returns the event types a notifier may subscribe to. +func NotifiableTypes() []Type { + var out []Type + for t, p := range policies { + if p.Notifiable { + out = append(out, t) + } + } + return out +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `docker run --rm -v "E:/Projects/Stukans/Marauder/backend:/backend" -w //backend golang:1.25 sh -c "go test ./internal/events/..."` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add backend/internal/events/event.go backend/internal/events/event_test.go +git commit -m "feat: add event taxonomy and per-type policy table (#93)" +``` + +--- + +## Task 2: events.Bus — fan-out to recorder + notifier + sse seam + +**Files:** +- Create: `backend/internal/events/bus.go` +- Test: `backend/internal/events/bus_test.go` + +**Interfaces:** +- Consumes: `events.Type`, `events.PolicyFor` (Task 1); `domain.Message`, `domain.TopicEvent`. +- Produces: + - `events.Event` struct: `{UserID uuid.UUID; TopicID *uuid.UUID; NotifierID *uuid.UUID; Type Type; Severity, Title, Body, Link string; Data map[string]any}`. + - Seams: `Recorder interface { Record(ctx, *domain.TopicEvent) (int64, error) }`, `Notifier interface { SendVia(ctx, userID uuid.UUID, notifierID *uuid.UUID, event string, msg domain.Message) int }`, `Publisher interface { Publish(userID uuid.UUID, ev Event, id int64) }`. + - `func New(rec Recorder, notif Notifier, pub Publisher, log zerolog.Logger) *Bus`. + - `func (b *Bus) Emit(ctx context.Context, ev Event)`. + +- [ ] **Step 1: Write the failing test** + +```go +package events + +import ( + "context" + "testing" + + "github.com/google/uuid" + "github.com/rs/zerolog" + + "github.com/artyomsv/marauder/backend/internal/domain" +) + +type fakeRecorder struct { + got *domain.TopicEvent + retID int64 + called int +} + +func (f *fakeRecorder) Record(_ context.Context, e *domain.TopicEvent) (int64, error) { + f.called++ + f.got = e + return f.retID, nil +} + +type fakeNotifier struct { + event string + notifierID *uuid.UUID + called int +} + +func (f *fakeNotifier) SendVia(_ context.Context, _ uuid.UUID, nid *uuid.UUID, event string, _ domain.Message) int { + f.called++ + f.event = event + f.notifierID = nid + return 1 +} + +type fakePublisher struct { + called int + id int64 +} + +func (f *fakePublisher) Publish(_ uuid.UUID, _ Event, id int64) { f.called++; f.id = id } + +func newBus(t *testing.T) (*Bus, *fakeRecorder, *fakeNotifier, *fakePublisher) { + t.Helper() + rec := &fakeRecorder{retID: 42} + notif := &fakeNotifier{} + pub := &fakePublisher{} + return New(rec, notif, pub, zerolog.Nop()), rec, notif, pub +} + +func TestEmit_ReleaseFound_PersistsNotifiesAndPublishes(t *testing.T) { + bus, rec, notif, pub := newBus(t) + tid := uuid.New() + bus.Emit(context.Background(), Event{ + UserID: uuid.New(), TopicID: &tid, Type: ReleaseFound, + Severity: "info", Title: "X", Body: "Y", + }) + if rec.called != 1 { + t.Errorf("recorder called %d, want 1", rec.called) + } + if rec.got.EventType != string(ReleaseFound) { + t.Errorf("recorded type %s, want release.found", rec.got.EventType) + } + if notif.called != 1 || notif.event != string(ReleaseFound) { + t.Errorf("notifier called %d event %q", notif.called, notif.event) + } + if pub.called != 1 || pub.id != 42 { + t.Errorf("publisher called %d id %d, want 1/42", pub.called, pub.id) + } +} + +func TestEmit_CheckStarted_PublishesOnly(t *testing.T) { + bus, rec, notif, pub := newBus(t) + tid := uuid.New() + bus.Emit(context.Background(), Event{UserID: uuid.New(), TopicID: &tid, Type: CheckStarted}) + if rec.called != 0 { + t.Errorf("recorder called %d, want 0", rec.called) + } + if notif.called != 0 { + t.Errorf("notifier called %d, want 0", notif.called) + } + if pub.called != 1 { + t.Errorf("publisher called %d, want 1", pub.called) + } + if pub.id != 0 { + t.Errorf("ephemeral publish id %d, want 0", pub.id) + } +} + +func TestEmit_NilSeams_NoPanic(t *testing.T) { + bus := New(nil, nil, nil, zerolog.Nop()) + tid := uuid.New() + bus.Emit(context.Background(), Event{UserID: uuid.New(), TopicID: &tid, Type: ReleaseFound}) + // no panic = pass +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `docker run --rm -v "E:/Projects/Stukans/Marauder/backend:/backend" -w //backend golang:1.25 sh -c "go test ./internal/events/..."` +Expected: FAIL — `Bus`, `New`, `Event` undefined. + +- [ ] **Step 3: Write minimal implementation** + +```go +package events + +import ( + "context" + + "github.com/google/uuid" + "github.com/rs/zerolog" + + "github.com/artyomsv/marauder/backend/internal/domain" +) + +// Event is one thing that happened, tagged with its canonical Type. +type Event struct { + UserID uuid.UUID + TopicID *uuid.UUID // nil for non-topic events (e.g. credential session.expired) + NotifierID *uuid.UUID // per-topic notifier override; nil => the user's default notifiers + Type Type + Severity string // info | warn | error (defaults to "info" when empty) + Title string + Body string + Link string + Data map[string]any +} + +// Recorder persists an event to the topic_events history table. +type Recorder interface { + Record(ctx context.Context, e *domain.TopicEvent) (int64, error) +} + +// Notifier fans an event out to a user's notifier plugins. Implemented by +// *notify.Dispatcher. The scheduler already proved this seam shape. +type Notifier interface { + SendVia(ctx context.Context, userID uuid.UUID, notifierID *uuid.UUID, event string, msg domain.Message) int +} + +// Publisher pushes an event onto the live SSE feed. Phase 1 wires nil; the +// Phase 3 hub implements it. id is the persisted history id (0 if ephemeral). +type Publisher interface { + Publish(userID uuid.UUID, ev Event, id int64) +} + +// Bus is the single event->sinks fan-out point. Every sink is optional +// (nil-safe) so the bus is cheap to construct in tests and across phases. +type Bus struct { + rec Recorder + notif Notifier + pub Publisher + log zerolog.Logger +} + +// New constructs a Bus. Any of rec/notif/pub may be nil. +func New(rec Recorder, notif Notifier, pub Publisher, log zerolog.Logger) *Bus { + return &Bus{rec: rec, notif: notif, pub: pub, log: log.With().Str("component", "events").Logger()} +} + +// Emit routes ev to its policy-selected sinks. Best-effort: every sink +// failure is logged and never propagated — emitting an event must never +// break the caller's flow. +func (b *Bus) Emit(ctx context.Context, ev Event) { + if ev.Severity == "" { + ev.Severity = "info" + } + p := PolicyFor(ev.Type) + + var id int64 + if p.Persist && b.rec != nil && ev.TopicID != nil { + rec := &domain.TopicEvent{ + TopicID: *ev.TopicID, + UserID: ev.UserID, + EventType: string(ev.Type), + Severity: ev.Severity, + Message: ev.Title, + Data: ev.Data, + } + got, err := b.rec.Record(ctx, rec) + if err != nil { + b.log.Warn().Err(err).Str("type", string(ev.Type)).Msg("events: record failed") + } else { + id = got + } + } + + if p.Notifiable && b.notif != nil { + b.notif.SendVia(ctx, ev.UserID, ev.NotifierID, string(ev.Type), domain.Message{ + Title: ev.Title, Body: ev.Body, Link: ev.Link, + }) + } + + if p.SSE && b.pub != nil { + b.pub.Publish(ev.UserID, ev, id) + } +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `docker run --rm -v "E:/Projects/Stukans/Marauder/backend:/backend" -w //backend golang:1.25 sh -c "go test ./internal/events/..."` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add backend/internal/events/bus.go backend/internal/events/bus_test.go +git commit -m "feat: add events.Bus fan-out to history, notifier, sse (#93)" +``` + +--- + +## Task 3: TopicEvents repo + +**Files:** +- Create: `backend/internal/db/repo/topic_events.go` +- Test: `backend/internal/db/repo/topic_events_test.go` + +**Interfaces:** +- Consumes: `domain.TopicEvent`. +- Produces: + - `type TopicEvents struct{…}`, `func NewTopicEvents(pool *pgxpool.Pool) *TopicEvents`. + - `Record(ctx, *domain.TopicEvent) (int64, error)` — INSERT … RETURNING id. + - `ListForTopic(ctx, topicID, userID uuid.UUID, limit int, beforeID int64) ([]*domain.TopicEvent, error)` — newest first; `beforeID==0` means "from newest". + - `ListForUserSince(ctx, userID uuid.UUID, sinceID int64) ([]*domain.TopicEvent, error)` — ascending; for Phase 3 SSE replay. + +- [ ] **Step 1: Write the failing test** + +```go +package repo + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + + "github.com/artyomsv/marauder/backend/internal/domain" +) + +// fakeRow implements pgx.Row for QueryRow-based Record. +type fakeRow struct{ id int64 } + +func (r fakeRow) Scan(dest ...any) error { + *(dest[0].(*int64)) = r.id + return nil +} + +type fakeTEPool struct { + lastSQL string + lastArgs []any + row fakeRow +} + +func (f *fakeTEPool) QueryRow(_ context.Context, sql string, args ...any) pgx.Row { + f.lastSQL = sql + f.lastArgs = args + return f.row +} +func (f *fakeTEPool) Query(_ context.Context, _ string, _ ...any) (pgx.Rows, error) { + return nil, nil +} +func (f *fakeTEPool) Exec(_ context.Context, _ string, _ ...any) (pgconn.CommandTag, error) { + return pgconn.CommandTag{}, nil +} + +func TestTopicEvents_Record_ReturnsID_AndMarshalsData(t *testing.T) { + pool := &fakeTEPool{row: fakeRow{id: 7}} + r := &TopicEvents{pool: pool} + tid, uid := uuid.New(), uuid.New() + id, err := r.Record(context.Background(), &domain.TopicEvent{ + TopicID: tid, UserID: uid, EventType: "release.found", Severity: "info", + Message: "New release", Data: map[string]any{"labels": []string{"s01e01"}}, + CreatedAt: time.Now(), + }) + if err != nil { + t.Fatalf("Record: %v", err) + } + if id != 7 { + t.Errorf("id = %d, want 7", id) + } + // data arg must be JSON bytes, not a raw map (pgx can't encode map->jsonb directly here) + raw, ok := pool.lastArgs[5].([]byte) + if !ok { + t.Fatalf("data arg type %T, want []byte", pool.lastArgs[5]) + } + var back map[string]any + if err := json.Unmarshal(raw, &back); err != nil { + t.Fatalf("data not valid JSON: %v", err) + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `docker run --rm -v "E:/Projects/Stukans/Marauder/backend:/backend" -w //backend golang:1.25 sh -c "go test ./internal/db/repo/ -run TopicEvents"` +Expected: FAIL — `TopicEvents` undefined. + +- [ ] **Step 3: Write minimal implementation** + +```go +package repo + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/artyomsv/marauder/backend/internal/domain" +) + +// topicEventsPool is the minimal pgx surface used by TopicEvents. +type topicEventsPool interface { + QueryRow(ctx context.Context, sql string, args ...any) pgx.Row + Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) + Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error) +} + +// TopicEvents is the repository for topic_events — a topic's history feed. +type TopicEvents struct { + pool topicEventsPool +} + +// NewTopicEvents constructs the repository. +func NewTopicEvents(pool *pgxpool.Pool) *TopicEvents { + return &TopicEvents{pool: pool} +} + +// Record inserts a history row and returns its serial id. Data is marshalled +// to JSON for the jsonb column (nil Data → SQL NULL). +func (r *TopicEvents) Record(ctx context.Context, e *domain.TopicEvent) (int64, error) { + var data []byte + if e.Data != nil { + b, err := json.Marshal(e.Data) + if err != nil { + return 0, fmt.Errorf("topic_events: marshal data: %w", err) + } + data = b + } + const q = ` +INSERT INTO topic_events (topic_id, user_id, event_type, severity, message, data) +VALUES ($1, $2, $3, $4, $5, $6) +RETURNING id` + var id int64 + if err := r.pool.QueryRow(ctx, q, e.TopicID, e.UserID, e.EventType, e.Severity, e.Message, data).Scan(&id); err != nil { + return 0, fmt.Errorf("topic_events: record: %w", err) + } + return id, nil +} + +// ListForTopic returns a topic's events, newest first, capped at limit. +// beforeID==0 returns from the newest; a positive beforeID pages older. +func (r *TopicEvents) ListForTopic(ctx context.Context, topicID, userID uuid.UUID, limit int, beforeID int64) ([]*domain.TopicEvent, error) { + if limit <= 0 || limit > 200 { + limit = 50 + } + const q = ` +SELECT id, topic_id, user_id, event_type, severity, message, data, created_at +FROM topic_events +WHERE topic_id = $1 AND user_id = $2 AND ($3 = 0 OR id < $3) +ORDER BY id DESC +LIMIT $4` + rows, err := r.pool.Query(ctx, q, topicID, userID, beforeID, limit) + if err != nil { + return nil, fmt.Errorf("topic_events: list: %w", err) + } + defer rows.Close() + return scanTopicEvents(rows) +} + +// ListForUserSince returns a user's events with id > sinceID, oldest first. +// Used by the Phase 3 SSE reconnect replay. +func (r *TopicEvents) ListForUserSince(ctx context.Context, userID uuid.UUID, sinceID int64) ([]*domain.TopicEvent, error) { + const q = ` +SELECT id, topic_id, user_id, event_type, severity, message, data, created_at +FROM topic_events +WHERE user_id = $1 AND id > $2 +ORDER BY id ASC +LIMIT 200` + rows, err := r.pool.Query(ctx, q, userID, sinceID) + if err != nil { + return nil, fmt.Errorf("topic_events: list since: %w", err) + } + defer rows.Close() + return scanTopicEvents(rows) +} + +func scanTopicEvents(rows pgx.Rows) ([]*domain.TopicEvent, error) { + var out []*domain.TopicEvent + for rows.Next() { + var e domain.TopicEvent + var data []byte + var createdAt time.Time + if err := rows.Scan(&e.ID, &e.TopicID, &e.UserID, &e.EventType, &e.Severity, &e.Message, &data, &createdAt); err != nil { + return nil, fmt.Errorf("topic_events: scan: %w", err) + } + if len(data) > 0 { + _ = json.Unmarshal(data, &e.Data) + } + e.CreatedAt = createdAt + out = append(out, &e) + } + return out, rows.Err() +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `docker run --rm -v "E:/Projects/Stukans/Marauder/backend:/backend" -w //backend golang:1.25 sh -c "go test ./internal/db/repo/ -run TopicEvents"` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add backend/internal/db/repo/topic_events.go backend/internal/db/repo/topic_events_test.go +git commit -m "feat: add TopicEvents repo for history feed (#93)" +``` + +--- + +## Task 4: Dispatcher legacy alias mapping + +**Files:** +- Modify: `backend/internal/notify/dispatcher.go` (the `subscribed` func at the bottom) +- Test: `backend/internal/notify/dispatcher_test.go` (create if absent) + +**Interfaces:** +- Produces: `subscribed(events []string, event string) bool` now matches canonical events through legacy aliases. No signature change. + +- [ ] **Step 1: Write the failing test** + +```go +package notify + +import "testing" + +func TestSubscribed_LegacyUpdated_MatchesNewReleaseEvents(t *testing.T) { + legacy := []string{"updated", "error"} + for _, ev := range []string{"release.found", "download.submitted", "download.completed"} { + if !subscribed(legacy, ev) { + t.Errorf("legacy 'updated' should match %s", ev) + } + } +} + +func TestSubscribed_LegacyError_MatchesErrorEvents(t *testing.T) { + legacy := []string{"error"} + for _, ev := range []string{"check.failed", "session.expired"} { + if !subscribed(legacy, ev) { + t.Errorf("legacy 'error' should match %s", ev) + } + } + if subscribed(legacy, "release.found") { + t.Error("legacy 'error' must NOT match release.found") + } +} + +func TestSubscribed_Canonical_DirectMatch(t *testing.T) { + if !subscribed([]string{"download.completed"}, "download.completed") { + t.Error("canonical event should match itself") + } + if subscribed([]string{"download.completed"}, "release.found") { + t.Error("canonical subscription must not match a different event") + } +} + +func TestSubscribed_EmptyMeansAll(t *testing.T) { + if !subscribed(nil, "anything") { + t.Error("empty subscription should match all") + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `docker run --rm -v "E:/Projects/Stukans/Marauder/backend:/backend" -w //backend golang:1.25 sh -c "go test ./internal/notify/ -run TestSubscribed"` +Expected: FAIL — legacy aliases not matched. + +- [ ] **Step 3: Write minimal implementation** (replace the existing `subscribed` func) + +```go +// legacyAliases maps a pre-taxonomy subscription keyword to the canonical +// event types it now covers, so notifiers created before per-event +// subscription (events = ['updated','error']) keep delivering. "updated" is +// intentionally broad — new releases, client submissions, and completions. +var legacyAliases = map[string][]string{ + "updated": {"release.found", "download.submitted", "download.completed"}, + "error": {"check.failed", "session.expired"}, +} + +// subscribed reports whether a notifier with the given subscription list +// should receive an event. An empty list (or empty event) means "all events". +// A subscription entry matches directly, or via its legacy alias expansion. +func subscribed(events []string, event string) bool { + if len(events) == 0 || event == "" { + return true + } + for _, e := range events { + if e == event { + return true + } + for _, alias := range legacyAliases[e] { + if alias == event { + return true + } + } + } + return false +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `docker run --rm -v "E:/Projects/Stukans/Marauder/backend:/backend" -w //backend golang:1.25 sh -c "go test ./internal/notify/..."` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add backend/internal/notify/dispatcher.go backend/internal/notify/dispatcher_test.go +git commit -m "feat: map legacy notifier events to canonical taxonomy (#93)" +``` + +--- + +## Task 5: Wire events.Bus into the scheduler + +Replaces the scheduler's direct `notifier.Send/SendVia("updated"|"error")` calls with typed `bus.Emit`. The scheduler swaps its `eventNotifier` field for an `emitter` seam (the bus). `release.found` fires when a new hash is detected; `download.submitted` fires per delivery; `check.failed` replaces `notifyError`; `session.expired` replaces the in-loop `Send`. `check.started`/`check.completed` are added (ephemeral, Phase 3 surfaces them — but emitting now is harmless and testable). + +**Files:** +- Modify: `backend/internal/scheduler/scheduler.go` +- Test: `backend/internal/scheduler/scheduler_test.go` + +**Interfaces:** +- Consumes: `events.Event`, `events.Type` consts, `events.Bus.Emit` (Tasks 1–2). +- Produces: scheduler `emitter interface { Emit(ctx context.Context, ev events.Event) }`; `New(...)` last param becomes `emit emitter` (was `notifier eventNotifier`). + +- [ ] **Step 1: Write the failing test** — add to `scheduler_test.go`. Use the existing test harness; add a fake emitter capturing events. + +```go +type fakeEmitter struct { + mu sync.Mutex + events []events.Event +} + +func (f *fakeEmitter) Emit(_ context.Context, ev events.Event) { + f.mu.Lock() + defer f.mu.Unlock() + f.events = append(f.events, ev) +} + +func (f *fakeEmitter) types() []events.Type { + f.mu.Lock() + defer f.mu.Unlock() + var out []events.Type + for _, e := range f.events { + out = append(out, e.Type) + } + return out +} + +// TestRunCheck_NewRelease_EmitsReleaseFoundAndSubmitted drives one check that +// detects a new hash and submits one payload, asserting the typed events fire. +func TestRunCheck_NewRelease_EmitsReleaseFoundAndSubmitted(t *testing.T) { + // Build the scheduler with the existing test helpers, passing emit=&fakeEmitter{}. + // (Mirror the existing "new release" scheduler test's setup — same fakes for + // topics/clients/tracker — but inject the fake emitter as the last New() arg.) + // After runCheck: + // types := emit.types() + // assert it contains events.ReleaseFound and events.DownloadSubmitted + // assert NO events.CheckFailed +} +``` + +> Implementer note: the existing `scheduler_test.go` already constructs a scheduler with fakes for a new-release scenario (the test that asserts the old `"updated"` `SendVia`). Copy that setup, swap the notifier fake for `&fakeEmitter{}`, and assert on `emit.types()`. Replace the old assertion on the notifier fake. + +- [ ] **Step 2: Run test to verify it fails** + +Run: `docker run --rm -v "E:/Projects/Stukans/Marauder/backend:/backend" -w //backend golang:1.25 sh -c "go test ./internal/scheduler/ -run TestRunCheck_NewRelease_EmitsReleaseFound"` +Expected: FAIL — `New` signature mismatch / emitter undefined. + +- [ ] **Step 3: Implement the scheduler changes** + +3a. Add the import and replace the `eventNotifier` interface (lines 86–94) with: + +```go +// emitter is the subset of *events.Bus the scheduler uses to publish typed +// lifecycle events. Defined as an interface so the scheduler stays +// unit-testable without the bus, and nil-safe in tests that ignore events. +type emitter interface { + Emit(ctx context.Context, ev events.Event) +} +``` + +Add to imports: `"github.com/artyomsv/marauder/backend/internal/events"`. + +3b. Rename the struct field (line 135) and constructor param/assignment (lines 135, 156, 161): + +```go +// struct field +emit emitter // nil-safe; publishes typed lifecycle events + +// New signature — last param +func New(cfg *config.Config, log zerolog.Logger, topics *repo.Topics, clients *repo.Clients, creds *repo.TrackerCredentials, deliveries *repo.Deliveries, master *crypto.MasterKey, emit emitter) *Scheduler { + // ... assign: emit: emit, +} +``` + +3c. Replace `notifyUpdated` (lines 426–446) body's send with an emit, and rename to keep call site: + +```go +func (s *Scheduler) notifyUpdated(ctx context.Context, t *domain.Topic, labels []string) { + if s.emit == nil || len(labels) == 0 { + return + } + const maxList = 10 + shown := labels + overflow := 0 + if len(shown) > maxList { + overflow = len(shown) - maxList + shown = shown[:maxList] + } + body := "Sent to client: " + strings.Join(shown, ", ") + if overflow > 0 { + body += fmt.Sprintf(" (+%d more)", overflow) + } + s.emit.Emit(ctx, events.Event{ + UserID: t.UserID, TopicID: &t.ID, NotifierID: t.NotifierID, + Type: events.DownloadSubmitted, Severity: "info", + Title: t.DisplayName, Body: body, Link: s.cfg.PublicBaseURL + "/topics", + }) +} +``` + +3d. Replace `notifyError` (lines 455–464): + +```go +func (s *Scheduler) notifyError(ctx context.Context, t *domain.Topic, errMsg string) { + if s.emit == nil || t.ConsecutiveErrors > 0 { + return + } + s.emit.Emit(ctx, events.Event{ + UserID: t.UserID, TopicID: &t.ID, NotifierID: t.NotifierID, + Type: events.CheckFailed, Severity: "error", + Title: "Topic check failed: " + t.DisplayName, Body: errMsg, + Link: s.cfg.PublicBaseURL + "/topics", + }) +} +``` + +3e. Replace the in-loop session-expiry `Send` (line ~511) — it has a credential, not a topic. Emit with `TopicID: &t.ID` (so it shows on the topic timeline) and `NotifierID: t.NotifierID`: + +```go +s.emit.Emit(ctx, events.Event{ + UserID: stored.UserID, TopicID: &t.ID, NotifierID: t.NotifierID, + Type: events.SessionExpired, Severity: "error", + Title: "Tracker session expired", + Body: t.TrackerName + " needs re-authentication", + Link: s.cfg.PublicBaseURL + "/credentials", +}) +``` + +> Behavior note: session-expiry moves from `Send` (all notifiers) to the same `SendVia(NotifierID)`/defaults routing as other topic events — more consistent, and now visible on the topic timeline. Keep the atomic `MarkSessionExpired` dedup gate exactly as-is so it still fires once. + +3f. Add `release.found` emission where a new hash is first detected (in `runCheck`, right after `check.Hash != t.LastHash` is confirmed and before `downloadAllPending`). Add `check.started` at the top of `runCheck` (after plugin lookup) and `check.completed` just before the final `recordResult` success call: + +```go +// after confirming new hash, before draining episodes: +if s.emit != nil { + s.emit.Emit(ctx, events.Event{ + UserID: t.UserID, TopicID: &t.ID, NotifierID: t.NotifierID, + Type: events.ReleaseFound, Severity: "info", + Title: t.DisplayName, Body: "New release detected", + Link: s.cfg.PublicBaseURL + "/topics", + }) +} +``` + +```go +// check.started — top of runCheck, once the tracker plugin is resolved: +if s.emit != nil { + s.emit.Emit(ctx, events.Event{UserID: t.UserID, TopicID: &t.ID, Type: events.CheckStarted}) +} + +// check.completed — before the success recordResult, carrying next_check_at: +if s.emit != nil { + s.emit.Emit(ctx, events.Event{ + UserID: t.UserID, TopicID: &t.ID, Type: events.CheckCompleted, + Data: map[string]any{"next_check_at": nextCheckAt.UTC().Format(time.RFC3339)}, + }) +} +``` + +> Implementer note: `nextCheckAt` is the value passed to `recordResult` on the success path (`s.backoff(t, false, nil)`). Compute it once into a local and reuse for both the emit and `recordResult` to keep them consistent. + +- [ ] **Step 4: Run tests to verify they pass** (and existing scheduler tests still pass) + +Run: `docker run --rm -v "E:/Projects/Stukans/Marauder/backend:/backend" -w //backend golang:1.25 sh -c "go test -race ./internal/scheduler/..."` +Expected: PASS. Fix any existing test that referenced the old notifier fake by switching it to the emitter fake. + +- [ ] **Step 5: Commit** + +```bash +git add backend/internal/scheduler/scheduler.go backend/internal/scheduler/scheduler_test.go +git commit -m "feat: emit typed lifecycle events from scheduler (#93)" +``` + +--- + +## Task 6: Construct the Bus and wire it (main.go) + emit topic.added + +**Files:** +- Modify: `backend/cmd/server/main.go` +- Modify: `backend/internal/api/handlers/topics.go` (the `Create` handler) +- Test: `backend/internal/api/handlers/topics_test.go` (extend the create test) + +**Interfaces:** +- Consumes: `events.New`, `repo.NewTopicEvents`, `events.Bus.Emit`. +- Produces: `handlers.Topics` gains an `Emit func(ctx, events.Event)` (or an `emitter` field) used by `Create`. + +- [ ] **Step 1: Write the failing test** (handler) — assert `Create` calls the emitter with `topic.added`. + +```go +func TestTopics_Create_EmitsTopicAdded(t *testing.T) { + var got []events.Event + h := &Topics{ + // ... existing fakes used by the create test ... + Emit: func(_ context.Context, ev events.Event) { got = append(got, ev) }, + } + // drive POST /topics with a valid body (reuse the existing create test setup) + // then: + if len(got) != 1 || got[0].Type != events.TopicAdded { + t.Fatalf("want one topic.added event, got %+v", got) + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `docker run --rm -v "E:/Projects/Stukans/Marauder/backend:/backend" -w //backend golang:1.25 sh -c "go test ./internal/api/handlers/ -run TestTopics_Create_EmitsTopicAdded"` +Expected: FAIL — `Emit` field undefined. + +- [ ] **Step 3: Implement** + +3a. In `topics.go`, add an optional emit hook to the `Topics` handler struct and call it after a successful `Create` (right after the topic is persisted, before writing the response): + +```go +// field on Topics handler: +Emit func(ctx context.Context, ev events.Event) // nil-safe + +// after successful create (created holds the new topic): +if h.Emit != nil { + h.Emit(r.Context(), events.Event{ + UserID: created.UserID, TopicID: &created.ID, NotifierID: created.NotifierID, + Type: events.TopicAdded, Severity: "info", + Title: created.DisplayName, Body: "Topic added", + Link: h.BaseURL + "/topics", + }) +} +``` + +Add import `"github.com/artyomsv/marauder/backend/internal/events"`. Guard nil so existing handler tests that don't set `Emit` keep passing. + +3b. In `main.go`, construct the repo, bus, and wire it (near lines 105–144): + +```go +topicEventsRepo := repo.NewTopicEvents(pool) +disp := notify.New(notifiersRepo, master, logger) +bus := events.New(topicEventsRepo, disp, nil, logger) // SSE publisher nil until Phase 3 +sch := scheduler.New(cfg, logger, topicsRepo, clientsRepo, credsRepo, deliveriesRepo, master, bus) +``` + +And set `Emit: bus.Emit` on the `handlers.Topics` construction (where the handler struct is built, ~line 165+). + +Add import `"github.com/artyomsv/marauder/backend/internal/events"`. + +- [ ] **Step 4: Run build + tests** + +Run: `docker run --rm -v "E:/Projects/Stukans/Marauder/backend:/backend" -w //backend golang:1.25 sh -c "go build ./... && go test ./internal/api/handlers/ -run TestTopics_Create"` +Expected: PASS, build clean. + +- [ ] **Step 5: Commit** + +```bash +git add backend/cmd/server/main.go backend/internal/api/handlers/topics.go backend/internal/api/handlers/topics_test.go +git commit -m "feat: wire events.Bus and emit topic.added on create (#93)" +``` + +--- + +## Task 7: GET /topics/{id}/events endpoint + +**Files:** +- Create: `backend/internal/api/handlers/topic_events.go` +- Test: `backend/internal/api/handlers/topic_events_test.go` +- Modify: `backend/internal/api/router.go` (register route), `backend/cmd/server/main.go` (inject repo into handler) + +**Interfaces:** +- Consumes: `repo.TopicEvents.ListForTopic`, `currentUserID`, `problem`, `writeJSON`. +- Produces: handler `TopicEvents{ Events topicEventsStore; Topics topicOwnerStore; BaseURL string }` with `List(w, r)`; response `{ "events": [ {id, event_type, severity, message, data, created_at} ] }`. + +- [ ] **Step 1: Write the failing test** + +```go +func TestTopicEvents_List_ReturnsTopicHistory(t *testing.T) { + tid := uuid.New() + store := &fakeTopicEventsStore{rows: []*domain.TopicEvent{ + {ID: 2, TopicID: tid, EventType: "release.found", Severity: "info", Message: "New release", CreatedAt: time.Now()}, + {ID: 1, TopicID: tid, EventType: "check.failed", Severity: "error", Message: "boom", CreatedAt: time.Now()}, + }} + h := &TopicEvents{Events: store, Topics: &fakeTopicOwner{ok: true}, BaseURL: ""} + // GET /topics/{id}/events with an authed context (reuse the handlers test helper that injects userID) + // assert 200, body.events length 2, first event_type == "release.found" +} +``` + +> Implementer note: mirror the existing `topics_test.go` request-building helper (it injects the auth claims into the request context). `fakeTopicOwner.GetByID` returns a topic owned by the test user when `ok`. + +- [ ] **Step 2: Run test to verify it fails** + +Run: `docker run --rm -v "E:/Projects/Stukans/Marauder/backend:/backend" -w //backend golang:1.25 sh -c "go test ./internal/api/handlers/ -run TestTopicEvents_List"` +Expected: FAIL — `TopicEvents` handler undefined. + +- [ ] **Step 3: Implement** + +```go +package handlers + +import ( + "context" + "net/http" + "strconv" + + "github.com/go-chi/chi/v5" + "github.com/google/uuid" + + "github.com/artyomsv/marauder/backend/internal/domain" + "github.com/artyomsv/marauder/backend/internal/problem" +) + +// topicEventsStore is the consumer seam over *repo.TopicEvents. +type topicEventsStore interface { + ListForTopic(ctx context.Context, topicID, userID uuid.UUID, limit int, beforeID int64) ([]*domain.TopicEvent, error) +} + +// topicOwnerStore verifies topic ownership before returning its history. +type topicOwnerStore interface { + GetByID(ctx context.Context, id, userID uuid.UUID) (*domain.Topic, error) +} + +// TopicEvents handles GET /topics/{id}/events. +type TopicEvents struct { + Events topicEventsStore + Topics topicOwnerStore + BaseURL string +} + +type topicEventView struct { + ID int64 `json:"id"` + EventType string `json:"event_type"` + Severity string `json:"severity"` + Message string `json:"message"` + Data map[string]any `json:"data,omitempty"` + CreatedAt string `json:"created_at"` +} + +// List handles GET /topics/{id}/events?limit=&before=. +func (h *TopicEvents) List(w http.ResponseWriter, r *http.Request) { + uid, perr := currentUserID(r) + if perr != nil { + problem.Write(w, r, h.BaseURL, perr) + return + } + id, ierr := uuid.Parse(chi.URLParam(r, "id")) + if ierr != nil { + problem.Write(w, r, h.BaseURL, problem.ErrBadRequest("invalid id")) + return + } + if _, err := h.Topics.GetByID(r.Context(), id, uid); err != nil { + problem.Write(w, r, h.BaseURL, problem.ErrNotFound("topic not found")) + return + } + limit, _ := strconv.Atoi(r.URL.Query().Get("limit")) + before, _ := strconv.ParseInt(r.URL.Query().Get("before"), 10, 64) + rows, err := h.Events.ListForTopic(r.Context(), id, uid, limit, before) + if err != nil { + problem.Write(w, r, h.BaseURL, problem.ErrInternal(err.Error())) + return + } + out := make([]topicEventView, 0, len(rows)) + for _, e := range rows { + out = append(out, topicEventView{ + ID: e.ID, EventType: e.EventType, Severity: e.Severity, Message: e.Message, + Data: e.Data, CreatedAt: e.CreatedAt.UTC().Format("2006-01-02T15:04:05Z"), + }) + } + writeJSON(w, http.StatusOK, map[string]any{"events": out}) +} +``` + +3b. Register in `router.go` next to the status route (`r.Get("/topics/{id}/status", …)`): + +```go +r.Get("/topics/{id}/events", topicEventsH.List) +``` + +3c. In `main.go`, construct `topicEventsH := &handlers.TopicEvents{Events: topicEventsRepo, Topics: topicsRepo, BaseURL: cfg.PublicBaseURL}` and pass it into the router deps (follow how `topicsH` is threaded into `router.New`/the deps struct). + +- [ ] **Step 4: Run build + tests** + +Run: `docker run --rm -v "E:/Projects/Stukans/Marauder/backend:/backend" -w //backend golang:1.25 sh -c "go build ./... && go vet ./... && go test -race ./..."` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add backend/internal/api/handlers/topic_events.go backend/internal/api/handlers/topic_events_test.go backend/internal/api/router.go backend/cmd/server/main.go +git commit -m "feat: add GET /topics/{id}/events history endpoint (#93)" +``` + +--- + +## Task 8: Notifier handler — canonical default + validation + +**Files:** +- Modify: `backend/internal/api/handlers/notifiers.go` (Create + Update default; add validation) +- Test: `backend/internal/api/handlers/notifiers_test.go` (create if absent) + +**Interfaces:** +- Consumes: `events.NotifiableTypes`. +- Produces: a package-level `validNotifierEvents(events []string) []string` that drops non-notifiable/unknown events; empty → full canonical notifiable set. + +- [ ] **Step 1: Write the failing test** + +```go +func TestValidNotifierEvents_FiltersAndDefaults(t *testing.T) { + // empty -> full canonical notifiable set (5 entries) + if got := validNotifierEvents(nil); len(got) != 5 { + t.Errorf("default set size = %d, want 5", len(got)) + } + // drops legacy 'updated' is allowed-through (kept for back-compat) but + // unknown junk is dropped + got := validNotifierEvents([]string{"release.found", "bogus.event", "download.completed"}) + for _, e := range got { + if e == "bogus.event" { + t.Errorf("bogus event should be dropped: %v", got) + } + } + if len(got) != 2 { + t.Errorf("got %v, want [release.found download.completed]", got) + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `docker run --rm -v "E:/Projects/Stukans/Marauder/backend:/backend" -w //backend golang:1.25 sh -c "go test ./internal/api/handlers/ -run TestValidNotifierEvents"` +Expected: FAIL — `validNotifierEvents` undefined. + +- [ ] **Step 3: Implement** — add helper and use it in Create (lines 114–117) and Update (lines 214–217), replacing the `["updated","error"]` default: + +```go +// validNotifierEvents keeps only events a notifier may subscribe to: +// the canonical notifiable set, plus the legacy "updated"/"error" keywords +// (still honored via dispatcher aliases). Empty input defaults to the full +// canonical notifiable set. Unknown/non-notifiable events are dropped. +func validNotifierEvents(in []string) []string { + allowed := map[string]bool{"updated": true, "error": true} + var canonical []string + for _, ty := range events.NotifiableTypes() { + allowed[string(ty)] = true + canonical = append(canonical, string(ty)) + } + if len(in) == 0 { + return canonical + } + out := make([]string, 0, len(in)) + seen := map[string]bool{} + for _, e := range in { + if allowed[e] && !seen[e] { + out = append(out, e) + seen[e] = true + } + } + if len(out) == 0 { + return canonical + } + return out +} +``` + +Replace both `events := req.Events; if len(events) == 0 { events = []string{"updated","error"} }` blocks with `events := validNotifierEvents(req.Events)`. Add import `"github.com/artyomsv/marauder/backend/internal/events"`. + +> Note: `NotifiableTypes()` is unordered (map iteration). That's fine for storage, but for a stable default ordering you may sort `canonical` with `sort.Strings(canonical)` before returning. + +- [ ] **Step 4: Run build + tests** + +Run: `docker run --rm -v "E:/Projects/Stukans/Marauder/backend:/backend" -w //backend golang:1.25 sh -c "go test ./internal/api/handlers/ -run TestValidNotifierEvents && go build ./..."` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add backend/internal/api/handlers/notifiers.go backend/internal/api/handlers/notifiers_test.go +git commit -m "feat: validate notifier events against canonical set (#93)" +``` + +--- + +## Task 9: Frontend — event constants, API method, query key + +**Files:** +- Create: `frontend/src/lib/events.ts` +- Modify: `frontend/src/lib/api.ts`, `frontend/src/lib/queryKeys.ts` + +**Interfaces:** +- Produces: `NOTIFIABLE_EVENTS` (canonical), `EVENT_LABELS` (i18n key map), `api.topicEvents(id)`, `TopicEvent` interface, `QK.topicEvents(id)`. + +- [ ] **Step 1: Write the failing test** (api shape) — `frontend/src/lib/events.test.ts` + +```ts +import { describe, it, expect } from "vitest"; +import { NOTIFIABLE_EVENTS, EVENT_LABELS } from "@/lib/events"; + +describe("events catalog", () => { + it("exposes the four notifiable groups", () => { + expect(NOTIFIABLE_EVENTS).toContain("release.found"); + expect(NOTIFIABLE_EVENTS).toContain("download.submitted"); + expect(NOTIFIABLE_EVENTS).toContain("download.completed"); + expect(NOTIFIABLE_EVENTS).toContain("check.failed"); + }); + it("has an i18n label key for every notifiable event", () => { + for (const e of NOTIFIABLE_EVENTS) { + expect(EVENT_LABELS[e]).toBeTruthy(); + } + }); +}); +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `docker run --rm -v "E:/Projects/Stukans/Marauder/frontend:/frontend" -w //frontend node:20-alpine sh -c "npm test -- events.test"` +Expected: FAIL — module missing. + +- [ ] **Step 3: Implement** + +`frontend/src/lib/events.ts`: + +```ts +// Canonical notifier-subscribable checkbox keys (the four UI groups). +export const NOTIFIABLE_EVENTS = [ + "release.found", + "download.submitted", + "download.completed", + "check.failed", +] as const; + +export type NotifiableEvent = (typeof NOTIFIABLE_EVENTS)[number]; + +// Each checkbox key maps to the canonical backend event(s) it stores. The +// "check.failed" box covers BOTH error events so session.expired alerts are +// delivered when the user opts into "errors" — without this, the backend's +// distinct session.expired event would silently never match. +export const EVENT_GROUP_EVENTS: Record = { + "release.found": ["release.found"], + "download.submitted": ["download.submitted"], + "download.completed": ["download.completed"], + "check.failed": ["check.failed", "session.expired"], +}; + +// Flattened canonical default: every notifiable backend event. Use as the +// initial subscription for a new notifier so all groups (incl. session.expired) +// are on by default — matching the backend's empty-input default. +export const ALL_NOTIFIABLE_EVENTS: string[] = Object.values(EVENT_GROUP_EVENTS).flat(); + +// i18n keys for each event label, used by the notifier picker and the +// per-topic timeline. Timeline-only (non-notifiable) events are included so +// the history view can render them too. +export const EVENT_LABELS: Record = { + "topic.added": "events.topic_added", + "check.started": "events.check_started", + "check.completed": "events.check_completed", + "release.found": "events.release_found", + "download.submitted": "events.download_submitted", + "download.progress": "events.download_progress", + "download.completed": "events.download_completed", + "check.failed": "events.check_failed", + "session.expired": "events.session_expired", + // legacy aliases still present on older notifier rows + updated: "events.release_found", + error: "events.check_failed", +}; +``` + +`queryKeys.ts` — add after `topicStatus`: + +```ts + // /topics/{id}/events — read-only per-topic history timeline. + topicEvents: (id: string) => ["topicEvents", id] as const, +``` + +`api.ts` — add the interface and method (near `topicStatus`): + +```ts +export interface TopicEvent { + id: number; + event_type: string; + severity: "info" | "warn" | "error"; + message: string; + data?: Record; + created_at: string; +} + +// inside the api object: + topicEvents: (id: string) => + api.get<{ events: TopicEvent[] }>(`/topics/${id}/events`), +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `docker run --rm -v "E:/Projects/Stukans/Marauder/frontend:/frontend" -w //frontend node:20-alpine sh -c "npm run typecheck && npm test -- events.test"` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add frontend/src/lib/events.ts frontend/src/lib/events.test.ts frontend/src/lib/api.ts frontend/src/lib/queryKeys.ts +git commit -m "feat: frontend event catalog, topicEvents api + key (#93)" +``` + +--- + +## Task 10: Frontend — EventPicker + Notifiers wiring + +**Files:** +- Create: `frontend/src/components/notifiers/EventPicker.tsx`, `EventPicker.test.tsx` +- Modify: `frontend/src/pages/Notifiers.tsx` (AddNotifierCard + `eventLabel`), `frontend/src/components/notifiers/EditNotifierCard.tsx` + +**Interfaces:** +- Produces: ` void} />` — checkbox group over `NOTIFIABLE_EVENTS`, maps legacy `updated`/`error` to checked groups on first render. + +- [ ] **Step 1: Write the failing test** — `EventPicker.test.tsx` + +```tsx +import { describe, it, expect, vi } from "vitest"; +import { render, screen } from "@testing-library/react"; +import userEvent from "@testing-library/user-event"; +import { EventPicker } from "@/components/notifiers/EventPicker"; + +describe("EventPicker", () => { + it("renders a checkbox per notifiable event and toggles", async () => { + const onChange = vi.fn(); + render(); + const completed = screen.getByLabelText(/download finished/i); + expect(completed).not.toBeChecked(); + await userEvent.click(completed); + expect(onChange).toHaveBeenCalledWith( + expect.arrayContaining(["release.found", "download.completed"]), + ); + }); + + it("shows legacy 'updated' as the three release-group boxes checked", () => { + render( {}} />); + expect(screen.getByLabelText(/new release/i)).toBeChecked(); + expect(screen.getByLabelText(/sent to client/i)).toBeChecked(); + expect(screen.getByLabelText(/download finished/i)).toBeChecked(); + expect(screen.getByLabelText(/error/i)).toBeChecked(); + }); +}); +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `docker run --rm -v "E:/Projects/Stukans/Marauder/frontend:/frontend" -w //frontend node:20-alpine sh -c "npm test -- EventPicker"` +Expected: FAIL — component missing. + +- [ ] **Step 3: Implement** `EventPicker.tsx` + +```tsx +import { useT } from "@/i18n"; +import { NOTIFIABLE_EVENTS, EVENT_GROUP_EVENTS, EVENT_LABELS } from "@/lib/events"; + +interface EventPickerProps { + value: string[]; + onChange: (next: string[]) => void; +} + +// expandStored turns a stored subscription (which may contain the legacy +// "updated"/"error" keywords) into the set of canonical backend events. +function expandStored(value: string[]): Set { + const set = new Set(); + for (const v of value) { + if (v === "updated") { + set.add("release.found"); + set.add("download.submitted"); + set.add("download.completed"); + } else if (v === "error") { + set.add("check.failed"); + set.add("session.expired"); + } else { + set.add(v); + } + } + return set; +} + +export function EventPicker({ value, onChange }: EventPickerProps) { + const t = useT(); + const selected = expandStored(value); + + // A group box is checked if ANY of its canonical events is selected. + const groupChecked = (key: (typeof NOTIFIABLE_EVENTS)[number]) => + EVENT_GROUP_EVENTS[key].some((e) => selected.has(e)); + + const toggleGroup = (key: (typeof NOTIFIABLE_EVENTS)[number], on: boolean) => { + const next = expandStored(value); + for (const e of EVENT_GROUP_EVENTS[key]) { + if (on) next.add(e); + else next.delete(e); + } + onChange([...next]); + }; + + return ( +
+ {t("notifiers.notify_on")}: + {NOTIFIABLE_EVENTS.map((key) => ( + + ))} +
+ ); +} +``` + +3b. In `Notifiers.tsx`, replace the inline `["updated","error"]` checkbox block (lines 318–334) with ``, change the initial state to `useState(ALL_NOTIFIABLE_EVENTS)` (import `ALL_NOTIFIABLE_EVENTS` and `EventPicker`). Update `eventLabel` (lines 34–40) to read from `EVENT_LABELS` so cards render the new labels. + +3c. In `EditNotifierCard.tsx`, swap its events checkbox block for `` (same import). + +- [ ] **Step 4: Run tests** + +Run: `docker run --rm -v "E:/Projects/Stukans/Marauder/frontend:/frontend" -w //frontend node:20-alpine sh -c "npm run typecheck && npm test -- EventPicker Notifiers"` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add frontend/src/components/notifiers/EventPicker.tsx frontend/src/components/notifiers/EventPicker.test.tsx frontend/src/pages/Notifiers.tsx frontend/src/components/notifiers/EditNotifierCard.tsx +git commit -m "feat: per-event notifier subscription picker UI (#93)" +``` + +--- + +## Task 11: Frontend — per-topic event timeline + +**Files:** +- Create: `frontend/src/components/topics/TopicEventsTimeline.tsx`, `TopicEventsTimeline.test.tsx` +- Modify: the topic row/detail in `frontend/src/pages/Topics.tsx` to mount the timeline (e.g. an expandable "History" section alongside the existing `DeliveryStatus`). + +**Interfaces:** +- Consumes: `api.topicEvents`, `QK.topicEvents`, `EVENT_LABELS`, `TopicEvent`. +- Produces: `` — read-only, query-driven list, newest first. + +- [ ] **Step 1: Write the failing test** — `TopicEventsTimeline.test.tsx` + +```tsx +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { render, screen } from "@testing-library/react"; +import { QueryClient, QueryClientProvider } from "@tanstack/react-query"; +import { TopicEventsTimeline } from "@/components/topics/TopicEventsTimeline"; +import { api } from "@/lib/api"; + +vi.mock("@/lib/api", () => ({ + api: { topicEvents: vi.fn() }, +})); + +function wrap(ui: React.ReactNode) { + const qc = new QueryClient({ defaultOptions: { queries: { retry: false } } }); + return render({ui}); +} + +describe("TopicEventsTimeline", () => { + beforeEach(() => vi.clearAllMocks()); + it("renders events newest-first with labels", async () => { + (api.topicEvents as ReturnType).mockResolvedValue({ + events: [ + { id: 2, event_type: "release.found", severity: "info", message: "New release", created_at: "2026-06-25T10:00:00Z" }, + { id: 1, event_type: "check.failed", severity: "error", message: "boom", created_at: "2026-06-25T09:00:00Z" }, + ], + }); + wrap(); + expect(await screen.findByText("New release")).toBeInTheDocument(); + expect(screen.getByText("boom")).toBeInTheDocument(); + }); +}); +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `docker run --rm -v "E:/Projects/Stukans/Marauder/frontend:/frontend" -w //frontend node:20-alpine sh -c "npm test -- TopicEventsTimeline"` +Expected: FAIL — component missing. + +- [ ] **Step 3: Implement** `TopicEventsTimeline.tsx` + +```tsx +import { useQuery } from "@tanstack/react-query"; +import { Loader2, Clock } from "lucide-react"; + +import { api } from "@/lib/api"; +import { QK } from "@/lib/queryKeys"; +import { useT } from "@/i18n"; +import { EVENT_LABELS } from "@/lib/events"; + +interface TopicEventsTimelineProps { + topicId: string; +} + +const severityDot: Record = { + info: "bg-primary", + warn: "bg-warning", + error: "bg-destructive", +}; + +export function TopicEventsTimeline({ topicId }: TopicEventsTimelineProps) { + const t = useT(); + const { data, isLoading } = useQuery({ + queryKey: QK.topicEvents(topicId), + queryFn: () => api.topicEvents(topicId), + }); + const events = data?.events ?? []; + + if (isLoading) { + return ( +
+ + {t("common.loading")} +
+ ); + } + if (events.length === 0) { + return ( +
+ + {t("topics.history.empty")} +
+ ); + } + return ( +
    + {events.map((e) => ( +
  1. + +
    +
    + {EVENT_LABELS[e.event_type] ? t(EVENT_LABELS[e.event_type]) : e.event_type} +
    + {e.message &&
    {e.message}
    } + +
    +
  2. + ))} +
+ ); +} +``` + +3b. Mount it in `Topics.tsx` where `DeliveryStatus` is rendered for an expanded topic — add a small "History" disclosure that renders ``. Keep `Topics.tsx` within its size budget (it already breaches 250 lines — extract the disclosure into the timeline component, do not inline more than the mount). + +- [ ] **Step 4: Run tests** + +Run: `docker run --rm -v "E:/Projects/Stukans/Marauder/frontend:/frontend" -w //frontend node:20-alpine sh -c "npm run typecheck && npm test -- TopicEventsTimeline"` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add frontend/src/components/topics/TopicEventsTimeline.tsx frontend/src/components/topics/TopicEventsTimeline.test.tsx frontend/src/pages/Topics.tsx +git commit -m "feat: per-topic event timeline view (#93)" +``` + +--- + +## Task 12: i18n keys + +**Files:** +- Modify: `frontend/src/i18n/en.ts`, `frontend/src/i18n/ru.ts` + +**Interfaces:** +- Produces: translation keys referenced by Tasks 10–11. + +- [ ] **Step 1: Add keys to `en.ts`** (and Russian equivalents to `ru.ts`): + +```ts + "notifiers.notify_on": "Notify on", + "events.topic_added": "Topic added", + "events.check_started": "Checking", + "events.check_completed": "Checked", + "events.release_found": "New release", + "events.download_submitted": "Sent to client", + "events.download_progress": "Downloading", + "events.download_completed": "Download finished", + "events.check_failed": "Error", + "events.session_expired": "Session expired", + "topics.history.empty": "No history yet", +``` + +For the EventPicker test labels to match (`/new release/i`, `/sent to client/i`, `/download finished/i`, `/error/i`), keep those English strings. + +- [ ] **Step 2: Verify build + full frontend suite** + +Run: `docker run --rm -v "E:/Projects/Stukans/Marauder/frontend:/frontend" -w //frontend node:20-alpine sh -c "npm run typecheck && npm test && npm run build"` +Expected: PASS (typecheck catches any missing key references if the i18n dict is typed). + +- [ ] **Step 3: Commit** + +```bash +git add frontend/src/i18n/en.ts frontend/src/i18n/ru.ts +git commit -m "feat: i18n labels for event taxonomy and timeline (#93)" +``` + +--- + +## Task 13: Docs — CLAUDE.md, CHANGELOG, getting-started + +**Files:** +- Modify: `CLAUDE.md` (backend table: new `events` package + `TopicEvents` repo + scheduler emits typed events; frontend: timeline + event picker), `CHANGELOG.md` (`[Unreleased]`), `docs/getting-started.md` if it documents notifier events. + +- [ ] **Step 1: Update `CLAUDE.md`** — add an `events` row to the backend package table: + +``` +| **`events`** | canonical event taxonomy (`Type` consts) + per-type `Policy` (persist/notifiable/sse) + `Bus.Emit` — the single event→sinks fan-out (history `topic_events`, notifier dispatcher, SSE seam). Phase-1 SSE publisher is nil | +``` + +Note `Topics` repo gains the `TopicEvents` sibling (history feed, `Record`/`ListForTopic`/`ListForUserSince`), the scheduler now emits typed `events.Event`s instead of calling the dispatcher directly, and the notifier dispatcher `subscribed()` carries legacy `updated`/`error` aliases. Frontend: `components/topics/TopicEventsTimeline`, `components/notifiers/EventPicker`, `lib/events.ts`. + +- [ ] **Step 2: Add a `CHANGELOG.md` `[Unreleased]` bullet:** + +```markdown +### Added +- Typed event taxonomy with per-event notifier subscriptions (new releases, sent-to-client, download finished, errors) and a read-only per-topic event timeline (#93). +``` + +- [ ] **Step 3: Commit** + +```bash +git add CLAUDE.md CHANGELOG.md docs/getting-started.md +git commit -m "docs: event taxonomy + notifier subscriptions + timeline (#93)" +``` + +--- + +## Final verification + +- [ ] **Backend full suite:** `docker run --rm -v "E:/Projects/Stukans/Marauder/backend:/backend" -w //backend golang:1.25 sh -c "go build ./... && go vet ./... && go test -race ./..."` → all pass. +- [ ] **Frontend full suite:** `docker run --rm -v "E:/Projects/Stukans/Marauder/frontend:/frontend" -w //frontend node:20-alpine sh -c "npm run typecheck && npm test && npm run build"` → all pass. +- [ ] **Manual smoke (dev stack):** create a notifier with only "Download finished" checked; confirm an existing `['updated','error']` notifier still shows all boxes checked and still delivers; open a topic's History and confirm `topic.added` appears. + +--- + +## Spec coverage check (Phase 1 scope of `2026-06-25-event-types-and-live-updates-design.md`) + +- §5 policy table → Task 1. §6.1 Bus → Task 2. §6.2 repo → Task 3. §6.3 alias → Task 4. §6.4 emit points (topic.added, check.started/completed, release.found, download.submitted, check.failed, session.expired) → Tasks 5–6. §7 notifier event editor → Tasks 8–10. §7 per-topic timeline → Tasks 7, 11. Phase-2 (`download.progress`/`completed` watcher) and Phase-3 (SSE hub, ticket, frontend EventSource, nginx) are **out of scope** here, by design — `download.completed`/`download.progress` policy rows exist but nothing emits them yet, and the Bus SSE publisher is nil. diff --git a/docs/superpowers/specs/2026-06-25-event-types-and-live-updates-design.md b/docs/superpowers/specs/2026-06-25-event-types-and-live-updates-design.md new file mode 100644 index 0000000..2150946 --- /dev/null +++ b/docs/superpowers/specs/2026-06-25-event-types-and-live-updates-design.md @@ -0,0 +1,239 @@ +# Event Types & Live UI Updates — Design Spec + +**Status:** Approved design (2026-06-25) — ready for implementation planning +**Topic:** A typed event taxonomy that notifiers subscribe to per-event-type, a server-side +download-progress watcher, and live server→client push via SSE for download progress, tracker-check +status, and a per-topic event timeline. + +--- + +## 1. Goal & decomposition + +Three independently shippable capabilities, all in scope, delivered in three phases: + +| # | Capability | Build state at start | +|---|---|---| +| **A** | Typed event taxonomy (topic added / check lifecycle / submitted / finished / errors) | Partially latent — `topic_events` table + `notifiers.events` filter exist, only `updated`/`error` emitted | +| **B** | Notifiers subscribe per-event-type | Data model + dispatch filter exist; **no UI editor**, only 2 events emitted | +| **C** | Live UI push (download progress + check status + event timeline) via **SSE** | Greenfield — no realtime transport | + +## 2. Locked decisions + +- **Scope:** all three capabilities, phased (§9). +- **Transport:** **SSE** (Server-Sent Events), not WebSocket — pure server→client push fan-out; + native browser reconnect; one-line nginx change; no extra Go dependency. +- **Subscription granularity:** **notifier-level only.** A notifier subscribes to a set of events; + a topic's existing `NotifierID` override picks *which* notifier (and thus inherits its event set). + No per-topic event mask. +- **Notifiable events (the 4 a notifier may subscribe to):** `release.found`, `download.submitted`, + `download.completed`, and the error pair `check.failed`/`session.expired`. +- **Live-feed-only (never notifier-subscribable):** `check.started`, `download.progress`, + `topic.added`, `check.completed` — too frequent/low-value for push notifications. +- **Backward compat:** existing notifier rows (`events=['updated','error']`) keep working via an + **alias map**. Legacy `"updated"` matches `release.found` + `download.submitted` + + `download.completed` (**broad**); legacy `"error"` matches `check.failed` + `session.expired`. + New UI writes canonical names. +- **Per-topic event timeline UI:** included (read-only), powered by wiring the dormant + `topic_events` table. + +## 3. Current state, grounded in code + +**Notifier side (B half-done):** +- `notifiers.events TEXT[] DEFAULT ['updated','error']` — `db/migrations/0001_initial_schema.sql:128` +- `notify.Dispatcher.Send/SendVia` already filter via `subscribed(n.Events, event)` — `notify/dispatcher.go:118` +- Handler defaults events to `["updated","error"]` — `api/handlers/notifiers.go:101`; UI renders them + read-only (`Notifiers.tsx:eventLabel`), no editor +- Only `"updated"` (new release, `scheduler.go:438`, per-topic `SendVia`) and `"error"` + (session expired, `scheduler.go:490`, global `Send`, atomically deduped) are emitted + +**Event-log backbone (A, dormant):** +- `topic_events` table — `0001_initial_schema.sql:106`: + `{id BIGSERIAL, topic_id, user_id, event_type, severity CHECK(info|warn|error), message, data JSONB, created_at}`, + indexed `(topic_id, created_at DESC)` and `(user_id, created_at DESC)`. **Nothing writes to it.** +- `domain.TopicEvent` type exists; no repo, no insert call. + +**Status/realtime (C, greenfield):** +- `GET /topics/{id}/status` reads `topic_deliveries`, enriches via + `registry.WithStatus.Status(ctx,cfg,hashes)` (10s timeout, fail-open) — `api/handlers/topics.go:358` +- qBittorrent + Transmission normalise to `StateDownloading/Seeding/Checking/Queued/Stopped/Error`, + `percent_done 0..1` +- `DeliveryStatus.tsx` adaptive-polls 3s active / 20s idle +- No WS/SSE; chi router is REST behind `RequireAuth` JWT middleware + +## 4. Architecture — a single event spine + +``` + ┌─────────────────────────────┐ + scheduler / handlers │ events.Bus.Emit(ctx, ev) │ + / progress watcher └──────────────┬──────────────┘ + │ per-type policy → tee (best-effort, non-blocking) + ┌─────────────────┼──────────────────┐ + ▼ ▼ ▼ + topic_events DB notify.Dispatcher SSE Hub + (history/timeline (telegram/email/…, (in-mem per-user + + SSE replay) filtered by .Events) pub/sub) + │ + browsers (EventSource) +``` + +One `events` package owns the canonical `Type`, the `Event` struct, the **policy table**, and the +`Bus` that tees to three sinks. The scheduler gains one new nil-safe seam (`eventEmitter`); its +current direct `notifier.Send/SendVia` calls are **replaced** by `emit()` so there is a single +emission point feeding all three consumers. + +## 5. Event model & per-type policy + +```go +type Type string +const ( + TopicAdded Type = "topic.added" + CheckStarted Type = "check.started" + CheckCompleted Type = "check.completed" + ReleaseFound Type = "release.found" + DownloadSubmitted Type = "download.submitted" + DownloadProgress Type = "download.progress" + DownloadCompleted Type = "download.completed" + CheckFailed Type = "check.failed" + SessionExpired Type = "session.expired" +) + +type Event struct { + ID int64 // topic_events BIGSERIAL → SSE Last-Event-ID (0 when not persisted) + UserID uuid.UUID + TopicID uuid.UUID + Type Type + Severity string // info | warn | error + Title string + Body string + Data map[string]any + At time.Time +} +``` + +**Policy table — the single source of truth (unit-tested):** + +| Type | persist (history) | notifier | SSE | +|---|:--:|:--:|:--:| +| `topic.added` | ✓ | — | ✓ | +| `check.started` | — | — | ✓ | +| `check.completed` | — | — | ✓ | +| `release.found` | ✓ | ✓ | ✓ | +| `download.submitted` | ✓ | ✓ | ✓ | +| `download.progress` | **—** | — | ✓ | +| `download.completed` | ✓ | ✓ | ✓ | +| `check.failed` | ✓ | ✓ | ✓ | +| `session.expired` | ✓ | ✓ | ✓ | + +`download.progress` is never persisted (write-storm avoidance) — SSE-only. `persist` events get a +real `ID` for SSE replay; ephemeral events carry `ID=0`. + +## 6. Backend components + +### 6.1 `events` package +`Event`, `Type`, `policy(Type) → {persist, notifiable, sse}`, and `Bus`: +- `Emit(ctx, Event)`: reads policy, then (a) if `persist`: insert into `topic_events`, set `ev.ID`; + (b) if `notifiable`: `notify.Dispatcher.SendVia(userID, notifierID, string(Type), msg)`; + (c) if `sse`: publish to hub. All best-effort — failures logged, never block the caller. +- Notifier-id source: events carry the topic's `NotifierID` so per-topic override is honored; + session-expiry stays global (`notifierID == nil`). + +### 6.2 `topic_events` repo (`db/repo/topic_events.go`) +`Record(ctx, Event) (int64, error)` (returns serial id) + `ListForTopic(ctx, topicID, userID, limit, beforeID)` +(paginated timeline) + `ListForUserSince(ctx, userID, sinceID)` (SSE reconnect replay of persisted events). + +### 6.3 Dispatcher alias mapping (`notify/dispatcher.go`) +Extend `subscribed(events, event)` with an alias map so legacy rows keep working: +`"updated"` ⇒ {`release.found`, `download.submitted`, `download.completed`} (broad); +`"error"` ⇒ {`check.failed`, `session.expired`}. New canonical names match directly. + +### 6.4 Emit points (replace/augment existing calls) +| Event | Site | +|---|---| +| `topic.added` | `handlers/topics.go:217` (after Create) | +| `check.started` | `scheduler.go` runCheck, before `tr.Check` | +| `check.completed` | `scheduler.go` after every non-errored check (both change and no-change paths), carries `next_check_at` in `Data` | +| `release.found` | `scheduler.go` when `check.Hash != t.LastHash`, before download loop | +| `download.submitted` | `scheduler.go:668` (after client `Add`, alongside `recordDelivery`) | +| `download.completed` | progress watcher (§6.5) | +| `check.failed` | `scheduler.go:360` error paths | +| `session.expired` | `scheduler.go:490` (keep atomic `MarkSessionExpired` dedup gate) | + +Today's `"updated"` (`scheduler.go:438`) and `"error"` (`:490`) `SendVia/Send` calls are removed in +favor of `emit()`. + +### 6.5 Progress watcher (`progress` package, Phase 2) +Bounded goroutine injected into the scheduler. Every ~5s: +1. Gather **in-flight** infohashes (deliveries not known-terminal), grouped topic→client. +2. For clients implementing `WithStatus`, call `Status(hashes)` (reuses `/status` plumbing). +3. Diff vs in-memory last-seen map: emit `download.progress` on percent change, + `download.completed` on transition to seeding-or-100%. +4. Terminal infohashes leave the watch set → **zero cost when idle**. +On startup, seed the watch set from `topic_deliveries` and classify once (bounded). Fail-open like +`/status`. + +### 6.6 SSE hub + endpoint (Phase 3) +- `GET /api/v1/events?ticket=…` → `text/event-stream`. Hub: `map[userID][]chan Event`, filtered by + `event.UserID`. Per-client buffered channel; **drop-on-full** so a slow client never blocks the bus. +- `:` heartbeat comment every ~25s to survive proxy idle timeouts. `id:` = `Event.ID` for persisted + events; on reconnect the browser sends `Last-Event-ID` → replay persisted events via + `ListForUserSince` (progress events are ephemeral, refreshed by the next poll, not replayed). +- **Auth:** `POST /api/v1/events/ticket` (normal JWT) → one-time token, in-memory, 30s TTL, bound to + userID. SSE endpoint consumes the ticket once and binds the connection to that user. Keeps the JWT + out of URLs/logs and is independent of the future HttpOnly-cookie migration. +- **nginx:** add a `/api/v1/events` location with `proxy_buffering off;` and a long read timeout in + **both** `deploy/nginx/gateway.conf` and the inline copy inside `deploy/docker-compose.ghcr.yml` + (keep-in-sync rule). + +## 7. Frontend components (Phase 3) + +- **`useEventStream()` provider** — one app-wide connection. Fetch ticket → open `EventSource`. On + `error`/close: refetch ticket + recreate (browser auto-reconnect can't refresh our ticket, so we + wrap it). Route by type: + - `download.progress` / `download.completed` → `queryClient.setQueryData(QK.topicStatus(topicId), …)` + - `release.found` / `download.submitted` → invalidate topics list + toast + - `check.started` / `check.completed` / `check.failed` → drive a live "checking…" pulse + + `next_check_at` countdown per topic row +- **Polling fallback** — `DeliveryStatus` poll stays, **disabled while SSE is connected**, re-enabled + at the slow 20s idle rate if the stream drops. No regression if SSE fails. +- **Notifiers page** — event-picker checkboxes for the 4 notifiable events, writing `notifiers.events`. + Render legacy `updated`/`error` rows correctly (map to the new labels). +- **Per-topic event timeline** — a read-only history view (drawer or tab on the topic) backed by + `GET /api/v1/topics/{id}/events` (paginated `ListForTopic`). New `QK.topicEvents(id)` query key. + +## 8. Error handling & testing + +**Fail-open everywhere** (matches codebase ethos): emit failures logged, never block a check; SSE hub +never blocks the emitter (drop-on-full per slow client); watcher errors fail-open like `/status`. + +**Tests:** +- `events`: policy-table assertions; `Bus.Emit` fan-out with fake persist/notify/sse sinks (verifies + each policy routes correctly); `ID` propagation. +- Dispatcher alias mapping (legacy `updated`/`error` → canonical sets). +- Progress watcher: fake `WithStatus`; assert percent-change → `progress`, seeding/100% → `completed`, + terminal-gating drops infohash from poll set. +- SSE hub: subscribe/publish/unsubscribe, ticket consume-once + TTL expiry, `Last-Event-ID` replay. +- `topic_events` repo: record + paginated `ListForTopic` + `ListForUserSince`. +- Frontend: `EventSource` mock → `setQueryData` routing; reconnect → ticket refetch; notifier + event-picker; timeline render. + +## 9. Implementation phasing + +1. **Phase 1 — event spine + history.** `events` pkg + policy + `topic_events` repo + emit points + + dispatcher aliases + notifier event-picker UI + per-topic timeline view. Ships richer notifications + and history with **zero transport risk**. +2. **Phase 2 — progress watcher.** `WithStatus` poller → `download.progress`/`download.completed`. + Enables "download finished" notifications. +3. **Phase 3 — SSE transport.** Hub + ticket endpoint + nginx + frontend `EventSource` + + retire-to-fallback polling + live check-status UI. + +## 10. Risks + +- **Event taxonomy is a one-way door** for notifier configs — names locked in §2/§5; renames later are + migrations. +- **"Download finished" requires the watcher** — confirmed in scope (Phase 2). +- **Single-process in-memory hub** — fine for the single-backend deployment; document a Redis/NATS + fan-out escape hatch before any multi-replica deploy. SSE reconnect replay via `topic_events` + already survives a backend restart for persisted events. +- **Notification spam** — `check.started` / `download.progress` are policy-enforced live-feed-only; + the notifier UI never offers them. +- **ghcr inline nginx drift** — the `/api/v1/events` location must be added to both gateway configs. diff --git a/frontend/src/components/notifiers/EditNotifierCard.tsx b/frontend/src/components/notifiers/EditNotifierCard.tsx index fc757ef..29eab57 100644 --- a/frontend/src/components/notifiers/EditNotifierCard.tsx +++ b/frontend/src/components/notifiers/EditNotifierCard.tsx @@ -10,6 +10,7 @@ import { Card } from "@/components/ui/card"; import { Input } from "@/components/ui/input"; import { Label } from "@/components/ui/label"; import { fieldsForPlugin } from "@/pages/Notifiers"; +import { EventPicker } from "@/components/notifiers/EventPicker"; interface EditNotifierCardProps { id: string; @@ -120,23 +121,7 @@ export function EditNotifierCard({ id, onClose, onSaved }: EditNotifierCardProps ))} -
- Notify on: - {(["updated", "error"] as const).map((ev) => ( - - ))} -
+