Skip to content

feat(orchestrator): add audit sink and event bus packages#16

Open
AiRanthem wants to merge 5 commits into
masterfrom
feature/audit-events-for-orchestrator-260511
Open

feat(orchestrator): add audit sink and event bus packages#16
AiRanthem wants to merge 5 commits into
masterfrom
feature/audit-events-for-orchestrator-260511

Conversation

@AiRanthem

@AiRanthem AiRanthem commented May 11, 2026

Copy link
Copy Markdown
Owner

Summary

  • Introduce pkg/orchestrator/audit — synchronous, fail-fast audit sink. Defines the Sink interface, EventRecord / TranscriptRecord shapes, the EventType taxonomy, an in-memory reference sink, a fail-fast Multi() fan-out, transcript constructors for the fully-rendered InvokeRequest shape required by DESIGN.md §9, and a redaction round-trip that preserves replay-critical structural metadata even when policies mutate input maps/slices in place.
  • Introduce pkg/orchestrator/events — best-effort runtime event bus. Task-scoped subscriptions with optional SessionID / RequestID filters, per-subscriber bounded channels with non-blocking Publish, drop accounting via inline SubscriberLaggedError, and terminal-event-driven subscription close. The EventType taxonomy is kept in lockstep with audit.EventType (the AGENTS.md no-cross-import rule forbids sharing it through a common package).
  • Align DESIGN.md and per-package PLAN.md with the shipped code. Adds the Seq field (uint64) to the §8.2 Event schema and the §9 transcript table; declares the previously-leaked sentinels (ErrNoSink, ErrRedactionStructure) and methods (Subscription.Dropped()); rewrites the events Subscriber-concurrency / Drop-policy / empty-TaskID sections to match the channel-plus-mutex implementation; archives the implementation plan under docs/superpowers/plans/ following repo convention.

Notes for reviewers

  • Two leaf packages, no orchestrator engine wiring yet. Both only depend on pkg/orchestrator/idgen per the dependency graph in DESIGN.md §13.1.
  • audit is the synchronous fail-fast surface; events is the non-blocking best-effort surface. They share the EventType constants by duplication — both blocks carry a sync comment.
  • audit.ApplyRedaction defensively clones nested map[string]any via reflection to prevent policies from corrupting structural metadata. TestRedactionPolicy_InPlaceMutationCannotCorruptStructuralMetadata exercises nested map / slice / typed-map paths.

Test plan

  • CI: gofmt -s -l clean, go vet ./... clean
  • go test ./pkg/orchestrator/audit/... ./pkg/orchestrator/events/... -race -count=1 passes
  • go build ./... clean
  • Manual: verify TestMemorySink_ConcurrentWritesAreSafe and TestBus_ConcurrentPublishSubscribe pass under -race
  • Manual: confirm DESIGN.md §8.2 and both PLAN.md files describe the same Event/Subscription surface as the code

AiRanthem added 3 commits May 12, 2026 02:42
Introduces pkg/orchestrator/audit, the synchronous, durable audit surface
for the orchestrator. Defines the Sink interface, EventRecord and
TranscriptRecord shapes, the EventType taxonomy (kept in lockstep with
events.EventType per AGENTS.md no-cross-import rule), an in-memory
reference sink for tests, a fail-fast Multi() fan-out helper, transcript
constructors for the fully-rendered InvokeRequest shape required by
DESIGN.md §9, and a redaction round-trip that preserves replay-critical
structural metadata even against in-place mutation by policies.

Tests cover per-task ordering, transcript insertion order, Multi
fail-fast through the real pipeline, redaction structure preservation
(including nested map/slice types), concurrent writers under -race, and
closed-sink rejection.
Introduces pkg/orchestrator/events, the non-blocking in-process event
bus that complements the audit sink. Defines the Bus and Subscription
interfaces with task-scoped subscriptions, optional SessionID/RequestID
filters, IncludeChunks suppression, per-subscriber bounded channels with
non-blocking Publish, a SubscriberLaggedError surfaced inline on the
Errors() channel (latest cumulative Dropped() count preserved), and
terminal-event-driven subscription close. The EventType taxonomy is
kept in lockstep with audit.EventType per the AGENTS.md no-cross-import
rule.

Tests cover single and multi-event ordering, session/request filtering,
chunk suppression, slow-subscriber drop accounting, terminal close
semantics, post-terminal subscribe rejection, idempotent close, publish
after bus close, and concurrent publishers/subscribers under -race
(with buffer headroom and pre-drain Dropped() assertion to avoid
tight-buffer timeout traps).
…ation

Reflects the shipped audit and events packages in the cross-module
authoritative documents:

- DESIGN.md §8.2: add Seq (uint64) to the Event schema and use uint64
  in the §9 transcript record table so audit and bus share width.
- audit/PLAN.md: document the Seq field in EventRecord, declare
  ErrNoSink and ErrRedactionStructure in the sentinel list, and finalize
  the Tests section with the transcript-shape and redaction tests that
  ship alongside the implementation.
- events/PLAN.md: declare Subscription.Dropped() in the public surface,
  document SubscribeOptions.SessionID / RequestID filters, rewrite the
  Subscriber concurrency section to match the channel + mutex
  implementation (replacing the earlier per-subscriber goroutine /
  sync.Map sketch), align Drop policy and Publish empty-TaskID semantics
  with the code, and add the session/request scoped test case.
- docs/superpowers/plans/2026-05-12-audit-events-review-fixes.md:
  archive the implementation plan that drove the doc/code reconciliation
  pass, following the repo convention from 2026-05-11-orchestrator-leaf-packages.md.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 77d1baffbd

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread pkg/orchestrator/audit/audit.go
Comment thread pkg/orchestrator/events/events.go Outdated
Comment thread pkg/orchestrator/events/events.go Outdated
Comment thread pkg/orchestrator/audit/audit.go Outdated
Comment thread pkg/orchestrator/events/events.go
Comment thread pkg/orchestrator/events/events.go Outdated

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 1ba678f578

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread pkg/orchestrator/audit/audit.go
Comment thread pkg/orchestrator/audit/audit.go Outdated
Comment thread pkg/orchestrator/audit/audit.go Outdated
Comment thread pkg/orchestrator/events/events_test.go Outdated
@AiRanthem

Copy link
Copy Markdown
Owner Author

@codex review

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 32853dfb6e

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

}
terminal := isTerminalTaskEvent(event.Type)
if terminal {
b.terminal[event.TaskID] = struct{}{}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Bound terminal-task tracking to prevent unbounded growth

Avoid retaining every finished task ID forever: each terminal publish stores event.TaskID in b.terminal, but nothing ever removes entries, so a long-running orchestrator leaks one map entry per completed/failed/cancelled task. In production with many tasks this causes unbounded memory growth in the event bus; consider expiring or bounding terminal-task state instead of permanent retention.

Useful? React with 👍 / 👎.

Comment thread pkg/orchestrator/events/events.go Outdated
Comment on lines +455 to +457
for iter.Next() {
cloned.SetMapIndex(cloneReflectValue(iter.Key()), cloneReflectValue(iter.Value()))
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Guard redaction cloning against cyclic diagnostic values

ApplyRedaction deep-clones PromptDiagnostics via recursive reflection, but the map/slice clone path has no cycle detection, so a self-referential value (for example m["self"] = m) recurses indefinitely and can crash the process with stack overflow. Since PromptDiagnostics is map[string]any, this input shape is possible from callers; the clone logic should detect cycles and return an error instead of panicking.

Useful? React with 👍 / 👎.

Comment thread pkg/orchestrator/events/events_test.go Outdated

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 87482c33e7

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread pkg/orchestrator/audit/audit.go Outdated
Comment thread pkg/orchestrator/events/events.go Outdated
Comment thread pkg/orchestrator/events/events.go Outdated
Comment thread pkg/orchestrator/audit/audit.go Outdated
@AiRanthem AiRanthem force-pushed the feature/audit-events-for-orchestrator-260511 branch from 87482c3 to 392acc2 Compare May 13, 2026 17:05

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 392acc2282

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

return publishInactive
}
select {
case s.events <- event:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Clone mutable payloads before enqueuing events

subscription.publish sends the Event struct by value, but Event.Payload is any and can hold mutable references (maps/slices/pointers). Because Publish is non-blocking, publishers can mutate those objects after Publish returns, and subscribers may then observe post-publish mutations (or hit data races) instead of a stable snapshot of the event at publish time. Snapshotting or serializing payloads before channel send would prevent cross-goroutine aliasing.

Useful? React with 👍 / 👎.

Comment thread pkg/orchestrator/audit/audit.go Outdated
Comment on lines +377 to +382
s.closed = true

var errs []error
for i, sink := range s.sinks {
if err := sink.Close(ctx); err != nil {
errs = append(errs, fmt.Errorf("close audit sink %d: %w", i, err))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Defer closed-state until child sink closes succeed

multiSink.Close marks the wrapper closed before attempting child Close calls, so if any child returns an error (for example with a canceled/deadline context), subsequent Close retries short-circuit at the s.closed check and never reattempt failed child cleanup. This can leave underlying sinks unflushed/open while reporting a logically closed wrapper; keep retryability by only finalizing closed state after child close attempts are successfully completed (or by tracking per-child close status).

Useful? React with 👍 / 👎.

Comment on lines +147 to +149
for _, sub := range matchedSubscribers {
switch sub.publish(event) {
case publishDelivered:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Serialize per-task delivery to preserve event sequence

Concurrent Publish calls can deliver out of sequence to a subscriber because each call snapshots subscribers under b.mu, then releases that lock and races to acquire each subscription mutex; a later event can acquire sub.mu first and be enqueued before an earlier event. This violates the expected ordered stream semantics for consumers (the plan explicitly tests ordered delivery) and can produce non-monotonic Seq observations under concurrent publishers.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant