Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
2e94780
docs: add event-types & live SSE design spec (#93)
github-actions[bot] Jun 25, 2026
1b2a7ba
docs: add phase 1 event-spine implementation plan (#93)
github-actions[bot] Jun 25, 2026
790f82c
feat: add event taxonomy and per-type policy table (#93)
github-actions[bot] Jun 25, 2026
223f4b8
feat: add events.Bus fan-out to history, notifier, sse (#93)
github-actions[bot] Jun 25, 2026
80b53da
feat: add TopicEvents repo for history feed (#93)
github-actions[bot] Jun 25, 2026
1db30ac
feat: map legacy notifier events to canonical taxonomy (#93)
github-actions[bot] Jun 25, 2026
29977fe
feat: emit typed lifecycle events from scheduler (#93)
github-actions[bot] Jun 25, 2026
1fd89c3
feat: wire events.Bus and emit topic.added on create (#93)
github-actions[bot] Jun 25, 2026
2153019
feat: add GET /topics/{id}/events history endpoint (#93)
github-actions[bot] Jun 25, 2026
c56546d
feat: validate notifier events against canonical set (#93)
github-actions[bot] Jun 25, 2026
21fb861
feat: frontend event catalog, topicEvents api + key (#93)
github-actions[bot] Jun 25, 2026
8618cdd
feat: per-event notifier subscription picker UI (#93)
github-actions[bot] Jun 25, 2026
db2b093
feat: per-topic event timeline view (#93)
github-actions[bot] Jun 25, 2026
30d7701
feat: i18n labels for event taxonomy and timeline (#93)
github-actions[bot] Jun 25, 2026
28338e4
docs: event taxonomy + notifier subscriptions + timeline (#93)
github-actions[bot] Jun 25, 2026
a2a0c5b
docs: correct topic.added emitter and download-finished scope (#93)
github-actions[bot] Jun 25, 2026
53f77fc
fix: restore session-expiry re-auth guidance text (#93)
github-actions[bot] Jun 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Typed event taxonomy with per-event notifier subscriptions (new releases, sent-to-client, errors) and a read-only per-topic event timeline. Download-finished event emission is deferred to a later phase (#93).

## [1.4.0] - 2026-06-24

### Added
Expand Down
12 changes: 10 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ techdebt/ Debt-tracking files (one per issue, see global rule)
| `config` | env-driven config struct (caarlos0/env) — **add new env vars here** |
| `crypto` | AES-256-GCM for tracker credentials and client config blobs |
| `db` / `db/repo` | pgxpool wrapper + repository structs (`Topics`, `Clients`, `Notifiers`, `Users`, `TrackerCredentials`, `Deliveries`, `Audit`, `Settings`). `Settings` (repo added for the Sonarr integration) reads/writes the singleton `settings` row — `GetSonarr`/`UpsertSonarr` (API key encrypted at rest like `oidc_client_secret_enc`, migration `0009`) + `UpdateSonarrCursor` (history-poll cursor). `Topics` adds `GetByURL(user,url)` for the poller's dedup pre-check; `Users` adds `GetInitialAdmin`. `TrackerCredentials` carries encrypted `secret_enc` (password) **and** `session_enc`/`session_nonce` (cookie-session blob; migration `0002`), plus a nullable `session_expired_at` marker (migration `0003`) the scheduler uses to dedupe expiry notifications (atomic `MarkSessionExpired`, cleared by `SetSession` on re-auth). `Deliveries` (migration `0006`, `topic_deliveries`) records every torrent pushed to a client — `{topic_id, infohash, label, client_id, delivered_at}`, unique on `(topic_id, infohash)` so `Record` is idempotent (`ON CONFLICT DO NOTHING`). `Notifiers` gains `is_default bool` (migration `0008`, per-type unique partial index) + `Update(ctx, id, userID, name, displayName string, events []string, isDefault bool, configEnc, configNonce []byte) error` |
| **`notify`** | reusable notification dispatcher — `Send(userID, domain.Message)` fans out to all of a user's configured notifiers (best-effort, metered); `SendVia(userID, notifierID, …)` scopes the same fan-out to one notifier when a topic overrides it (nil ⇒ the user's **default** notifiers only (strict; none set ⇒ no send)). Consumers: scheduler new-release alerts + topic-error alerts (both per-topic via `SendVia`; error events deduped to fire once per error episode, on the first failure) + credential session-expiry alerts (global). The single event→notifier fan-out point |
| **`notify`** | reusable notification dispatcher — `Send(userID, domain.Message)` fans out to all of a user's configured notifiers (best-effort, metered); `SendVia(userID, notifierID, …)` scopes the same fan-out to one notifier when a topic overrides it (nil ⇒ the user's **default** notifiers only (strict; none set ⇒ no send)). Consumers: the `events.Bus` fan-out (per-event subscription, wired via `subscribed()` which maps typed `events.Type` to notifier event list; legacy `['updated','error']` rows kept working via dispatcher aliases). The single event→notifier fan-out point. Error events are deduped to fire once per error episode (on the first failure); session-expiry alerts are global one-shot |
| `domain` | core types: `Topic` (incl. per-topic `ClientID`, **`NotifierID`** (per-topic notifier override, migration `0007`), `DownloadDir`, `Category`, `ImageURL`), `Check`, `Payload`, `TrackerCredential`, `TopicDelivery`, `AddOptions` (`DownloadDir` + `Category`) |
| **`topics`** | shared `BuildAndCreate(store, CreateInput)` — the one tracker-match → `Parse` → fail-open `ResolveMetadata` → build → persist sequence, used by BOTH the `POST /topics` handler and the Sonarr poller (no duplication). Idempotent: a `(user_id,url)` unique-violation returns `Result{Created:false}` instead of erroring. Sentinels `ErrNoTracker`/`ErrParse`/`ErrQualityUnsupported` |
| **`events`** | canonical event taxonomy (`Type` consts: topic.added, check.{started,completed,failed}, release.found, download.submitted, session.expired, download.{progress,completed}) + per-type `Policy` (persist/notifiable/sse) + `Bus.Emit` — the single event→sinks fan-out (history `topic_events`, notifier dispatcher, SSE seam). Phase-1 SSE publisher is nil (Phase 3). Phase-2 `download.{progress,completed}` policy rows exist but nothing emits them yet. `GET /api/v1/topics/{id}/events` endpoint reads the history. Frontend: `components/topics/TopicEventsTimeline`, `components/notifiers/EventPicker`, `lib/events.ts` |
| **`topics`** | shared `BuildAndCreate(store, CreateInput)` — the one tracker-match → `Parse` → fail-open `ResolveMetadata` → build → persist sequence, used by BOTH the `POST /topics` handler and the Sonarr poller (no duplication). Idempotent: a `(user_id,url)` unique-violation returns `Result{Created:false}` instead of erroring. Sentinels `ErrNoTracker`/`ErrParse`/`ErrQualityUnsupported`. **`TopicEvents`** sibling repo (history feed: `Record`/`ListForTopic`/`ListForUserSince`) |
| **`sonarr`** | Sonarr integration (issue #86): a typed read-only API `Client` (`SystemStatus` for the Test button; `GrabHistorySince` reads `eventType=grabbed` history, extracting `data.nzbInfoUrl` — the tracker topic URL, **not** `guid`) + a `Poller` that mirrors the scheduler (ticker loop, ctx-cancel, fail-open). Self-gates on the DB `settings.sonarr_enabled`, resolves the owner (configured admin ⇒ first admin), dedups history by URL, filters by allowed trackers, and auto-creates topics via `topics.BuildAndCreate` with configured default client/category/dir. First enable is go-forward only (cursor stamped, no historical import). Admin config UI: **Integrations** page (`pages/Integrations.tsx` → `components/integrations/SonarrCard`); API `GET/PUT/POST /api/v1/system/sonarr{,/test}` |
| **`infohash`** | derives the BitTorrent v1 infohash (lowercase hex) from a `Payload` — `FromMagnet` (parses `xt=urn:btih:`, hex or base32) / `FromTorrent` (SHA-1 of the bencoded `info` dict via a length-based scanner) / `FromPayload`. The universal key linking a delivery to a client's live torrent status |
| **`extra`** | shared `extra.Int / StringSlice / String` helpers for the untyped `map[string]any` blobs in `Topic.Extra` and `Check.Extra` (added 2026-04-07; **use this instead of writing local helpers**) |
Expand Down Expand Up @@ -91,6 +92,8 @@ techdebt/ Debt-tracking files (one per issue, see global rule)
4. `recordResult` — persists `next_check_at` (with exponential backoff
on errors, capped at 6h) and writes the run summary metrics.

**Event emission:** the scheduler emits typed `events.Event`s via `events.Bus.Emit` at key points: `check.started`/`check.completed`/`check.failed` (per check cycle), `release.found` (per new torrent detected), `download.submitted` (after client send), and `session.expired` (on credential session loss). (`topic.added` is emitted separately by the topics HTTP handler on `POST /topics`, not by the scheduler.) The bus fans out to `topic_events` history table (all events persisted if policy allows), the notifier dispatcher (for event-type subscriptions), and an SSE seam (Phase 3 — publisher nil in Phase 1).

**Per-topic delivery:** `sendViaClient` passes `domain.AddOptions{DownloadDir:
t.DownloadDir, Category: t.Category}` to the client plugin. Category is a
**path segment, not a client-native label**: each client config carries an
Expand Down Expand Up @@ -134,6 +137,10 @@ src/
│ ├── shared/ Reusable across pages
│ │ ├── DeleteConfirm.tsx Two-click destructive confirm (uses useArmedConfirm)
│ │ └── ResourceCard.tsx Slot-based card chrome for list pages
│ ├── topics/ Page-specific topic components
│ │ └── TopicEventsTimeline.tsx Per-topic event history feed (read-only)
│ ├── notifiers/ Page-specific notifier components
│ │ └── EventPicker.tsx Event type checkbox list (for notifier subscription)
│ └── ui/ shadcn primitives — DO NOT hand-edit
├── hooks/ (legacy folder, mostly empty — prefer lib/hooks)
├── i18n/ en/ru dictionaries + useT hook
Expand All @@ -143,6 +150,7 @@ src/
│ ├── prefs.ts zustand store: theme, locale, density
│ ├── queryKeys.ts Centralised React Query key factory (QK)
│ ├── utils.ts cn() helper
│ ├── events.ts Event type defs + `eventLabel()` i18n helper
│ └── hooks/
│ ├── useArmedConfirm.ts Two-state idle⇄armed machine with timeout
│ ├── useDebouncedValue.ts Generic debounce for query inputs
Expand Down
11 changes: 8 additions & 3 deletions backend/cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/artyomsv/marauder/backend/internal/db"
"github.com/artyomsv/marauder/backend/internal/db/repo"
"github.com/artyomsv/marauder/backend/internal/domain"
"github.com/artyomsv/marauder/backend/internal/events"
"github.com/artyomsv/marauder/backend/internal/logging"
"github.com/artyomsv/marauder/backend/internal/notify"
"github.com/artyomsv/marauder/backend/internal/scheduler"
Expand Down Expand Up @@ -140,8 +141,10 @@ func run() error {
}

// Scheduler
topicEventsRepo := repo.NewTopicEvents(pool)
disp := notify.New(notifiersRepo, master, logger)
sch := scheduler.New(cfg, logger, topicsRepo, clientsRepo, credsRepo, deliveriesRepo, master, disp)
bus := events.New(topicEventsRepo, disp, nil, logger) // SSE publisher nil until Phase 3
sch := scheduler.New(cfg, logger, topicsRepo, clientsRepo, credsRepo, deliveriesRepo, master, bus)
go func() {
if err := sch.Start(rootCtx); err != nil {
logger.Error().Err(err).Msg("scheduler exited with error")
Expand Down Expand Up @@ -170,12 +173,14 @@ func run() error {
Clients: clientsRepo,
Notifiers: notifiersRepo,
Creds: credsRepo,
Deliveries: deliveriesRepo,
Settings: settingsRepo,
Deliveries: deliveriesRepo,
TopicEvents: topicEventsRepo,
Settings: settingsRepo,
Audit: auditRepo,
AuditLog: auditLogger,
OIDC: oidcProvider,
Scheduler: sch,
Emit: bus.Emit,
})
srv := &http.Server{
Addr: cfg.HTTPAddr,
Expand Down
45 changes: 35 additions & 10 deletions backend/internal/api/handlers/notifiers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"encoding/json"
"errors"
"net/http"
"sort"

"github.com/go-chi/chi/v5"
"github.com/google/uuid"

"github.com/artyomsv/marauder/backend/internal/crypto"
"github.com/artyomsv/marauder/backend/internal/db/repo"
"github.com/artyomsv/marauder/backend/internal/domain"
"github.com/artyomsv/marauder/backend/internal/events"
"github.com/artyomsv/marauder/backend/internal/plugins/registry"
"github.com/artyomsv/marauder/backend/internal/problem"
)
Expand Down Expand Up @@ -54,6 +56,35 @@ func notifierToView(n *domain.Notifier) notifierView {
}
}

// validNotifierEvents keeps only events a notifier may subscribe to:
// the canonical notifiable set, plus the legacy "updated"/"error" keywords
// (still honored via dispatcher aliases). Empty input defaults to the full
// canonical notifiable set. Unknown/non-notifiable events are dropped.
func validNotifierEvents(in []string) []string {
allowed := map[string]bool{"updated": true, "error": true}
var canonical []string
for _, ty := range events.NotifiableTypes() {
allowed[string(ty)] = true
canonical = append(canonical, string(ty))
}
sort.Strings(canonical)
if len(in) == 0 {
return canonical
}
out := make([]string, 0, len(in))
seen := map[string]bool{}
for _, e := range in {
if allowed[e] && !seen[e] {
out = append(out, e)
seen[e] = true
}
}
if len(out) == 0 {
return canonical
}
return out
}

// List handles GET /notifiers.
func (h *Notifiers) List(w http.ResponseWriter, r *http.Request) {
uid, perr := currentUserID(r)
Expand Down Expand Up @@ -111,17 +142,14 @@ func (h *Notifiers) Create(w http.ResponseWriter, r *http.Request) {
problem.Write(w, r, h.BaseURL, problem.ErrInternal("encrypt: "+err.Error()))
return
}
events := req.Events
if len(events) == 0 {
events = []string{"updated", "error"}
}
evts := validNotifierEvents(req.Events)
created, cerr := h.Notifiers.Create(r.Context(), &domain.Notifier{
UserID: uid,
NotifierName: req.NotifierName,
DisplayName: req.DisplayName,
ConfigEnc: enc,
ConfigNonce: nonce,
Events: events,
Events: evts,
IsDefault: req.IsDefault,
})
if cerr != nil {
Expand Down Expand Up @@ -211,11 +239,8 @@ func (h *Notifiers) Update(w http.ResponseWriter, r *http.Request) {
problem.Write(w, r, h.BaseURL, problem.ErrInternal("encrypt: "+eerr.Error()))
return
}
events := req.Events
if len(events) == 0 {
events = []string{"updated", "error"}
}
if err := h.Notifiers.Update(r.Context(), id, uid, existing.NotifierName, req.DisplayName, events, req.IsDefault, enc, nonce); err != nil {
evts := validNotifierEvents(req.Events)
if err := h.Notifiers.Update(r.Context(), id, uid, existing.NotifierName, req.DisplayName, evts, req.IsDefault, enc, nonce); err != nil {
if errors.Is(err, repo.ErrNotFound) {
problem.Write(w, r, h.BaseURL, problem.ErrNotFound("notifier not found"))
return
Expand Down
18 changes: 18 additions & 0 deletions backend/internal/api/handlers/notifiers_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,21 @@ func TestNotifiers_Update_PassesIsDefault(t *testing.T) {
t.Errorf("Update not called with is_default=true (called=%v def=%v)", store.updateCalled, store.updateIsDef)
}
}

func TestValidNotifierEvents_FiltersAndDefaults(t *testing.T) {
// empty -> full canonical notifiable set (5 entries)
if got := validNotifierEvents(nil); len(got) != 5 {
t.Errorf("default set size = %d, want 5", len(got))
}
// drops legacy 'updated' is allowed-through (kept for back-compat) but
// unknown junk is dropped
got := validNotifierEvents([]string{"release.found", "bogus.event", "download.completed"})
for _, e := range got {
if e == "bogus.event" {
t.Errorf("bogus event should be dropped: %v", got)
}
}
if len(got) != 2 {
t.Errorf("got %v, want [release.found download.completed]", got)
}
}
76 changes: 76 additions & 0 deletions backend/internal/api/handlers/topic_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package handlers

import (
"context"
"net/http"
"strconv"

"github.com/go-chi/chi/v5"
"github.com/google/uuid"

"github.com/artyomsv/marauder/backend/internal/domain"
"github.com/artyomsv/marauder/backend/internal/problem"
)

// topicEventsStore is the consumer seam over *repo.TopicEvents.
type topicEventsStore interface {
ListForTopic(ctx context.Context, topicID, userID uuid.UUID, limit int, beforeID int64) ([]*domain.TopicEvent, error)
}

// topicOwnerStore verifies topic ownership before returning its history.
type topicOwnerStore interface {
GetByID(ctx context.Context, id uuid.UUID, userID *uuid.UUID) (*domain.Topic, error)
}

// TopicEvents handles GET /topics/{id}/events.
type TopicEvents struct {
Events topicEventsStore
Topics topicOwnerStore
BaseURL string
}

type topicEventView struct {
ID int64 `json:"id"`
EventType string `json:"event_type"`
Severity string `json:"severity"`
Message string `json:"message"`
Data map[string]any `json:"data,omitempty"`
CreatedAt string `json:"created_at"`
}

// List handles GET /topics/{id}/events?limit=&before=.
func (h *TopicEvents) List(w http.ResponseWriter, r *http.Request) {
uid, perr := currentUserID(r)
if perr != nil {
problem.Write(w, r, h.BaseURL, perr)
return
}
id, ierr := uuid.Parse(chi.URLParam(r, "id"))
if ierr != nil {
problem.Write(w, r, h.BaseURL, problem.ErrBadRequest("invalid id"))
return
}
if _, err := h.Topics.GetByID(r.Context(), id, &uid); err != nil {
problem.Write(w, r, h.BaseURL, problem.ErrNotFound("topic not found"))
return
}
limit, _ := strconv.Atoi(r.URL.Query().Get("limit"))
before, _ := strconv.ParseInt(r.URL.Query().Get("before"), 10, 64)
rows, err := h.Events.ListForTopic(r.Context(), id, uid, limit, before)
if err != nil {
problem.Write(w, r, h.BaseURL, problem.ErrInternal(err.Error()))
return
}
out := make([]topicEventView, 0, len(rows))
for _, e := range rows {
out = append(out, topicEventView{
ID: e.ID,
EventType: e.EventType,
Severity: e.Severity,
Message: e.Message,
Data: e.Data,
CreatedAt: e.CreatedAt.UTC().Format("2006-01-02T15:04:05Z"),
})
}
writeJSON(w, http.StatusOK, map[string]any{"events": out})
}
95 changes: 95 additions & 0 deletions backend/internal/api/handlers/topic_events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package handlers

import (
"context"
"encoding/json"
"errors"
"net/http/httptest"
"testing"
"time"

"github.com/google/uuid"

"github.com/artyomsv/marauder/backend/internal/domain"
)

// fakeTopicEventsStore is a stub topicEventsStore for handler tests.
type fakeTopicEventsStore struct {
rows []*domain.TopicEvent
err error
}

func (f *fakeTopicEventsStore) ListForTopic(_ context.Context, _, _ uuid.UUID, _ int, _ int64) ([]*domain.TopicEvent, error) {
return f.rows, f.err
}

// fakeTopicOwner is a stub topicOwnerStore for handler tests.
type fakeTopicOwner struct {
ok bool
}

func (f *fakeTopicOwner) GetByID(_ context.Context, _ uuid.UUID, _ *uuid.UUID) (*domain.Topic, error) {
if f.ok {
return &domain.Topic{ID: uuid.New()}, nil
}
return nil, errors.New("not found")
}

// topicEventsResp mirrors the JSON shape the handler returns.
type topicEventsResp struct {
Events []struct {
ID int64 `json:"id"`
EventType string `json:"event_type"`
Severity string `json:"severity"`
Message string `json:"message"`
Data map[string]any `json:"data,omitempty"`
CreatedAt string `json:"created_at"`
} `json:"events"`
}

// TestTopicEvents_List_ReturnsTopicHistory verifies that a GET on an owned
// topic returns 200 with events ordered as the store delivered them.
func TestTopicEvents_List_ReturnsTopicHistory(t *testing.T) {
tid := uuid.New()
store := &fakeTopicEventsStore{rows: []*domain.TopicEvent{
{ID: 2, TopicID: tid, EventType: "release.found", Severity: "info", Message: "New release", CreatedAt: time.Now()},
{ID: 1, TopicID: tid, EventType: "check.failed", Severity: "error", Message: "boom", CreatedAt: time.Now()},
}}
h := &TopicEvents{Events: store, Topics: &fakeTopicOwner{ok: true}, BaseURL: ""}

w := httptest.NewRecorder()
req := withURLParam(authedReq(t, uuid.New(), nil), "id", tid.String())
h.List(w, req)

if w.Code != 200 {
t.Fatalf("status %d, want 200; body %s", w.Code, w.Body.String())
}
var resp topicEventsResp
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("decode: %v", err)
}
if len(resp.Events) != 2 {
t.Fatalf("got %d events, want 2", len(resp.Events))
}
if resp.Events[0].EventType != "release.found" {
t.Errorf("events[0].event_type = %q, want release.found", resp.Events[0].EventType)
}
if resp.Events[1].EventType != "check.failed" {
t.Errorf("events[1].event_type = %q, want check.failed", resp.Events[1].EventType)
}
}

// TestTopicEvents_List_NotOwnedTopic verifies that when GetByID errors (topic
// not found or not owned), the handler returns 404.
func TestTopicEvents_List_NotOwnedTopic(t *testing.T) {
store := &fakeTopicEventsStore{}
h := &TopicEvents{Events: store, Topics: &fakeTopicOwner{ok: false}, BaseURL: ""}

w := httptest.NewRecorder()
req := withURLParam(authedReq(t, uuid.New(), nil), "id", uuid.New().String())
h.List(w, req)

if w.Code != 404 {
t.Fatalf("status %d, want 404; body %s", w.Code, w.Body.String())
}
}
Loading