Skip to content

Add tracer facade implementation#84

Open
jqdsouza wants to merge 6 commits into
mainfrom
add-tracer-facade
Open

Add tracer facade implementation#84
jqdsouza wants to merge 6 commits into
mainfrom
add-tracer-facade

Conversation

@jqdsouza

@jqdsouza jqdsouza commented May 31, 2026

Copy link
Copy Markdown
Contributor

Note

Medium Risk
New concurrency-heavy tracing path with buffered drops and shutdown ordering; mistakes could lose traces or mishandle sensitive capture data, though behavior is heavily tested.

Overview
Introduces a tracer facade in pkg/asymptotetrace that agents can use to record harness activity without blocking on I/O.

Start spins a background worker with a buffered command queue, defaulting to local origin, a 1024-slot buffer, and a JSONL sink at ./asymptote-trace.jsonl when none is configured. Observe enqueues Capture values asynchronously (accept/drop semantics, context cancel on send), maps them to canonical Events (including optional privacy processing), and writes via the configured Sink. Flush and Shutdown use barrier commands; shutdown is signaled through a dedicated stopCh so it does not deadlock when the command buffer is full, and finalize drains pending work before flush/close. Stats expose accepted, dropped, and sink error counts.

Tests cover defaults, non-blocking observe, flush/shutdown under contention, buffer drops, privacy redaction, and concurrent use.

Reviewed by Cursor Bugbot for commit efae041. Bugbot is set up for automated code reviews on this repo. Configure here.

Comment thread pkg/asymptotetrace/tracer.go Outdated
Comment thread pkg/asymptotetrace/tracer.go
Bug 1: In run(), when the stop command is processed, drain any remaining
captures from the channel that raced with stop (enqueued after stop but
before accepting was set to false). This prevents silently dropping
captures that were reported as accepted.

Bug 2: In Shutdown() and sendBarrier(), when <-t.done wins the select
over <-ack, perform a non-blocking read of the ack channel to retrieve
any error that was sent before done closed. This prevents ignoring
flush/close errors during shutdown.
Comment thread pkg/asymptotetrace/tracer.go Outdated
During Shutdown, the stop handler's drain loop was acknowledging
queued Flush commands with nil without calling Sink.Flush(), then
performing a single flush later. This allowed a concurrent Flush
caller to return success before the sink was actually flushed,
breaking barrier semantics and risking lost trace data.

Fix by invoking Sink.Flush() for flush commands encountered during
the drain loop, matching the normal flush handling behavior.
Comment thread pkg/asymptotetrace/tracer.go Outdated
Shutdown previously sent a stop command through the buffered command
channel. If the worker was blocked on a slow WriteBatch and the buffer
was full (filled by Observe calls), the stop command could not be
enqueued, creating a circular wait: shutdown waits for channel space,
the worker waits for the sink, and nothing drains the queue.

Replace the in-band stop command with a dedicated stopCh channel that
Shutdown closes. The run loop selects on both the command channel and
stopCh, so shutdown signaling never competes for buffer space. The
shutdown error is communicated via a shutdownErr field written before
close(done), which establishes a happens-before relationship for
readers that wait on <-done.
Comment thread pkg/asymptotetrace/tracer.go

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Flush hides shutdown failure
    • Changed the first select's t.done branch in sendBarrier to return t.shutdownErr (or ErrTracerClosed if nil) instead of nil, matching the error propagation pattern of the other t.done branches in the same function.
Preview (efae041643)
diff --git a/pkg/asymptotetrace/tracer.go b/pkg/asymptotetrace/tracer.go
new file mode 100644
--- /dev/null
+++ b/pkg/asymptotetrace/tracer.go
@@ -1,0 +1,408 @@
+package asymptotetrace
+
+import (
+	"context"
+	"errors"
+	"sync"
+	"sync/atomic"
+	"time"
+)
+
+const (
+	DefaultBufferSize = 1024
+	DefaultTracePath  = "./asymptote-trace.jsonl"
+)
+
+var (
+	ErrHarnessRequired = errors.New("harness is required")
+	ErrTracerClosed    = errors.New("tracer is closed")
+)
+
+type Options struct {
+	Harness    string
+	Origin     Origin
+	Session    *SessionInfo
+	Run        *RunInfo
+	Sink       Sink
+	Path       string
+	Privacy    *PrivacyPolicy
+	BufferSize int
+}
+
+type Capture struct {
+	Action   string
+	Category string
+	Severity Severity
+	Time     time.Time
+	Message  string
+	Input    interface{}
+	Output   interface{}
+	Error    error
+	Raw      map[string]interface{}
+}
+
+type ObserveResult struct {
+	Accepted bool
+	Dropped  bool
+}
+
+type TracerStats struct {
+	Accepted  uint64
+	Dropped   uint64
+	Errors    uint64
+	LastError string
+}
+
+type Tracer struct {
+	opts     Options
+	commands chan tracerCommand
+	done     chan struct{}
+	stopCh   chan struct{}
+
+	accepting   atomic.Bool
+	closing     atomic.Bool
+	closeMu     sync.Mutex
+	shutdownErr error
+	commandMu   sync.Mutex
+
+	accepted     atomic.Uint64
+	dropped      atomic.Uint64
+	errors       atomic.Uint64
+	pendingSends atomic.Int64
+	errMu        sync.Mutex
+	lastError    string
+	sendDone     chan struct{}
+}
+
+type tracerCommand struct {
+	kind    tracerCommandKind
+	capture Capture
+	ack     chan error
+}
+
+type tracerCommandKind int
+
+const (
+	tracerCommandCapture tracerCommandKind = iota
+	tracerCommandFlush
+	tracerCommandStop
+)
+
+func Start(opts Options) (*Tracer, error) {
+	normalized, err := opts.withDefaults()
+	if err != nil {
+		return nil, err
+	}
+	tracer := &Tracer{
+		opts:     normalized,
+		commands: make(chan tracerCommand, normalized.BufferSize),
+		done:     make(chan struct{}),
+		stopCh:   make(chan struct{}),
+		sendDone: make(chan struct{}, 1),
+	}
+	tracer.accepting.Store(true)
+	go tracer.run()
+	return tracer, nil
+}
+
+func (t *Tracer) Observe(ctx context.Context, capture Capture) (ObserveResult, error) {
+	if ctx == nil {
+		ctx = context.Background()
+	}
+	if !t.accepting.Load() {
+		t.dropped.Add(1)
+		return ObserveResult{Dropped: true}, ErrTracerClosed
+	}
+	if !t.beginCommandSend() {
+		t.dropped.Add(1)
+		return ObserveResult{Dropped: true}, ErrTracerClosed
+	}
+	defer t.finishCommandSend()
+	select {
+	case t.commands <- tracerCommand{kind: tracerCommandCapture, capture: capture}:
+		t.accepted.Add(1)
+		return ObserveResult{Accepted: true}, nil
+	case <-ctx.Done():
+		return ObserveResult{}, ctx.Err()
+	default:
+		t.dropped.Add(1)
+		return ObserveResult{Dropped: true}, nil
+	}
+}
+
+func (t *Tracer) Flush(ctx context.Context) error {
+	if ctx == nil {
+		ctx = context.Background()
+	}
+	return t.sendBarrier(ctx, tracerCommandFlush)
+}
+
+func (t *Tracer) Shutdown(ctx context.Context) error {
+	if ctx == nil {
+		ctx = context.Background()
+	}
+	if t.closing.Load() {
+		select {
+		case <-t.done:
+			return t.shutdownErr
+		case <-ctx.Done():
+			return ctx.Err()
+		}
+	}
+	t.closeMu.Lock()
+	defer t.closeMu.Unlock()
+	if t.closing.Load() {
+		select {
+		case <-t.done:
+			return t.shutdownErr
+		case <-ctx.Done():
+			return ctx.Err()
+		}
+	}
+	t.closing.Store(true)
+	t.accepting.Store(false)
+	close(t.stopCh)
+	select {
+	case <-t.done:
+		return t.shutdownErr
+	case <-ctx.Done():
+		return ctx.Err()
+	}
+}
+
+func (t *Tracer) Stats() TracerStats {
+	t.errMu.Lock()
+	lastError := t.lastError
+	t.errMu.Unlock()
+	return TracerStats{
+		Accepted:  t.accepted.Load(),
+		Dropped:   t.dropped.Load(),
+		Errors:    t.errors.Load(),
+		LastError: lastError,
+	}
+}
+
+func (t *Tracer) sendBarrier(ctx context.Context, kind tracerCommandKind) error {
+	ack := make(chan error, 1)
+	if !t.beginCommandSend() {
+		select {
+		case <-t.done:
+			return t.shutdownErr
+		default:
+			return ErrTracerClosed
+		}
+	}
+	select {
+	case <-t.done:
+		t.finishCommandSend()
+		if t.shutdownErr != nil {
+			return t.shutdownErr
+		}
+		return ErrTracerClosed
+	case t.commands <- tracerCommand{kind: kind, ack: ack}:
+		t.finishCommandSend()
+	case <-ctx.Done():
+		t.finishCommandSend()
+		return ctx.Err()
+	}
+	select {
+	case err := <-ack:
+		return err
+	case <-t.done:
+		select {
+		case err := <-ack:
+			return err
+		default:
+			if t.shutdownErr != nil {
+				return t.shutdownErr
+			}
+			return ErrTracerClosed
+		}
+	case <-ctx.Done():
+		return ctx.Err()
+	}
+}
+
+func (t *Tracer) beginCommandSend() bool {
+	t.commandMu.Lock()
+	defer t.commandMu.Unlock()
+	if t.closing.Load() {
+		return false
+	}
+	t.pendingSends.Add(1)
+	return true
+}
+
+func (t *Tracer) finishCommandSend() {
+	t.pendingSends.Add(-1)
+	select {
+	case t.sendDone <- struct{}{}:
+	default:
+	}
+}
+
+func (t *Tracer) run() {
+	defer close(t.done)
+	for {
+		select {
+		case command := <-t.commands:
+			switch command.kind {
+			case tracerCommandCapture:
+				if err := t.writeCapture(context.Background(), command.capture); err != nil {
+					t.recordError(err)
+				}
+			case tracerCommandFlush:
+				command.ack <- t.opts.Sink.Flush(context.Background())
+			case tracerCommandStop:
+				t.shutdownErr = t.finalize()
+				command.ack <- t.shutdownErr
+				return
+			}
+		case <-t.stopCh:
+			t.shutdownErr = t.finalize()
+			return
+		}
+	}
+}
+
+func (t *Tracer) finalize() error {
+	for {
+		select {
+		case cmd := <-t.commands:
+			t.handleFinalCommand(cmd)
+		default:
+			if t.pendingSends.Load() != 0 {
+				select {
+				case cmd := <-t.commands:
+					t.handleFinalCommand(cmd)
+				case <-t.sendDone:
+				}
+				continue
+			}
+			err := t.opts.Sink.Flush(context.Background())
+			if closeErr := t.opts.Sink.Close(); closeErr != nil {
+				err = errors.Join(err, closeErr)
+			}
+			return err
+		}
+	}
+}
+
+func (t *Tracer) handleFinalCommand(cmd tracerCommand) {
+	switch cmd.kind {
+	case tracerCommandCapture:
+		if err := t.writeCapture(context.Background(), cmd.capture); err != nil {
+			t.recordError(err)
+		}
+	case tracerCommandFlush:
+		cmd.ack <- t.opts.Sink.Flush(context.Background())
+	case tracerCommandStop:
+		cmd.ack <- nil
+	}
+}
+
+func (t *Tracer) writeCapture(ctx context.Context, capture Capture) error {
+	event := t.eventFromCapture(capture)
+	if err := event.Validate(); err != nil {
+		return err
+	}
+	events := []Event{event}
+	var err error
+	for _, processor := range t.opts.processors() {
+		events, err = processor.Process(ctx, events)
+		if err != nil {
+			return err
+		}
+	}
+	if len(events) == 0 {
+		return nil
+	}
+	return t.opts.Sink.WriteBatch(ctx, events)
+}
+
+func (t *Tracer) eventFromCapture(capture Capture) Event {
+	action := capture.Action
+	if action == "" {
+		action = UnclassifiedTraceAction
+	}
+	category := capture.Category
+	if category == "" {
+		category = "trace"
+	}
+	message := capture.Message
+	if capture.Error != nil && message == "" {
+		message = capture.Error.Error()
+	}
+	event := NewEvent(NewEventOptions{
+		Action:   action,
+		Category: category,
+		Severity: capture.Severity,
+		Harness:  HarnessInfo{Name: t.opts.Harness},
+		Message:  message,
+		Origin:   t.opts.Origin,
+		Run:      cloneRun(t.opts.Run),
+	})
+	if !capture.Time.IsZero() {
+		event.Timestamp = capture.Time.UTC().Format(time.RFC3339)
+	}
+	event.Session = cloneSession(t.opts.Session)
+	event.Raw = captureRaw(capture)
+	return event
+}
+
+func captureRaw(capture Capture) map[string]interface{} {
+	raw := copyMap(capture.Raw)
+	if raw == nil {
+		raw = map[string]interface{}{}
+	}
+	if capture.Input != nil {
+		raw["input"] = capture.Input
+	}
+	if capture.Output != nil {
+		raw["output"] = capture.Output
+	}
+	if capture.Error != nil {
+		raw["error"] = capture.Error.Error()
+	}
+	if len(raw) == 0 {
+		return nil
+	}
+	return raw
+}
+
+func (t *Tracer) recordError(err error) {
+	if err == nil {
+		return
+	}
+	t.errors.Add(1)
+	t.errMu.Lock()
+	t.lastError = err.Error()
+	t.errMu.Unlock()
+}
+
+func (opts Options) withDefaults() (Options, error) {
+	if opts.Harness == "" {
+		return Options{}, ErrHarnessRequired
+	}
+	if opts.Origin == "" {
+		opts.Origin = OriginLocal
+	}
+	if opts.BufferSize <= 0 {
+		opts.BufferSize = DefaultBufferSize
+	}
+	if opts.Sink == nil {
+		path := opts.Path
+		if path == "" {
+			path = DefaultTracePath
+		}
+		opts.Sink = NewJSONLSink(path)
+	}
+	return opts, nil
+}
+
+func (opts Options) processors() []Processor {
+	if opts.Privacy == nil {
+		return nil
+	}
+	return []Processor{NewPrivacyProcessor(*opts.Privacy)}
+}

diff --git a/pkg/asymptotetrace/tracer_test.go b/pkg/asymptotetrace/tracer_test.go
new file mode 100644
--- /dev/null
+++ b/pkg/asymptotetrace/tracer_test.go
@@ -1,0 +1,435 @@
+package asymptotetrace
+
+import (
+	"context"
+	"encoding/json"
+	"errors"
+	"os"
+	"strings"
+	"sync"
+	"testing"
+	"time"
+)
+
+type blockingSink struct {
+	started chan struct{}
+	release chan struct{}
+	mu      sync.Mutex
+	events  []Event
+}
+
+func newBlockingSink() *blockingSink {
+	return &blockingSink{
+		started: make(chan struct{}),
+		release: make(chan struct{}),
+	}
+}
+
+func (s *blockingSink) WriteBatch(ctx context.Context, events []Event) error {
+	select {
+	case <-s.started:
+	default:
+		close(s.started)
+	}
+	select {
+	case <-s.release:
+	case <-ctx.Done():
+		return ctx.Err()
+	}
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	for _, event := range events {
+		s.events = append(s.events, copyEvent(event))
+	}
+	return nil
+}
+
+func (s *blockingSink) Flush(context.Context) error { return nil }
+func (s *blockingSink) Close() error                { return nil }
+
+func (s *blockingSink) snapshot() []Event {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	events := make([]Event, len(s.events))
+	copy(events, s.events)
+	return events
+}
+
+type firstWriteBlockingSink struct {
+	started  chan struct{}
+	release  chan struct{}
+	flushErr error
+
+	once       sync.Once
+	mu         sync.Mutex
+	flushCount int
+}
+
+func newFirstWriteBlockingSink(flushErr error) *firstWriteBlockingSink {
+	return &firstWriteBlockingSink{
+		started:  make(chan struct{}),
+		release:  make(chan struct{}),
+		flushErr: flushErr,
+	}
+}
+
+func (s *firstWriteBlockingSink) WriteBatch(ctx context.Context, events []Event) error {
+	block := false
+	s.once.Do(func() {
+		block = true
+		close(s.started)
+	})
+	if block {
+		select {
+		case <-s.release:
+		case <-ctx.Done():
+			return ctx.Err()
+		}
+	}
+	return nil
+}
+
+func (s *firstWriteBlockingSink) Flush(context.Context) error {
+	s.mu.Lock()
+	s.flushCount++
+	s.mu.Unlock()
+	return s.flushErr
+}
+
+func (s *firstWriteBlockingSink) Close() error {
+	return nil
+}
+
+func (s *firstWriteBlockingSink) flushes() int {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.flushCount
+}
+
+func TestStartRejectsMissingHarness(t *testing.T) {
+	if _, err := Start(Options{}); !errors.Is(err, ErrHarnessRequired) {
+		t.Fatalf("Start error = %v, want ErrHarnessRequired", err)
+	}
+}
+
+func TestStartDefaultsAndShutdownWritesJSONL(t *testing.T) {
+	path := t.TempDir() + "/trace.jsonl"
+	tracer, err := Start(Options{Harness: "my-agent", Path: path})
+	if err != nil {
+		t.Fatalf("Start returned error: %v", err)
+	}
+
+	result, err := tracer.Observe(context.Background(), Capture{
+		Action:   "runner.started",
+		Category: "runner",
+		Raw:      map[string]interface{}{"input": "hello"},
+	})
+	if err != nil {
+		t.Fatalf("Observe returned error: %v", err)
+	}
+	if !result.Accepted || result.Dropped {
+		t.Fatalf("Observe result = %#v, want accepted", result)
+	}
+	if err := tracer.Shutdown(context.Background()); err != nil {
+		t.Fatalf("Shutdown returned error: %v", err)
+	}
+
+	events := readJSONLEvents(t, path)
+	if len(events) != 1 {
+		t.Fatalf("events = %d, want 1", len(events))
+	}
+	if events[0].Harness.Name != "my-agent" || events[0].Origin != OriginLocal {
+		t.Fatalf("unexpected harness/origin: %#v", events[0])
+	}
+	if events[0].Raw["input"] != "hello" {
+		t.Fatalf("raw input = %#v, want hello", events[0].Raw)
+	}
+}
+
+func TestStartDefaultsToLocalOriginAndDefaultJSONLSink(t *testing.T) {
+	tracer, err := Start(Options{Harness: "my-agent"})
+	if err != nil {
+		t.Fatalf("Start returned error: %v", err)
+	}
+	defer tracer.Shutdown(context.Background())
+
+	if tracer.opts.Origin != OriginLocal {
+		t.Fatalf("Origin = %q, want local", tracer.opts.Origin)
+	}
+	sink, ok := tracer.opts.Sink.(*JSONLSink)
+	if !ok {
+		t.Fatalf("Sink = %T, want *JSONLSink", tracer.opts.Sink)
+	}
+	if sink.path != DefaultTracePath {
+		t.Fatalf("default path = %q, want %q", sink.path, DefaultTracePath)
+	}
+}
+
+func TestObserveDoesNotCallSinkSynchronously(t *testing.T) {
+	sink := newBlockingSink()
+	tracer, err := Start(Options{Harness: "my-agent", Sink: sink, BufferSize: 4})
+	if err != nil {
+		t.Fatalf("Start returned error: %v", err)
+	}
+	defer func() {
+		close(sink.release)
+		_ = tracer.Shutdown(context.Background())
+	}()
+
+	done := make(chan error, 1)
+	go func() {
+		_, err := tracer.Observe(context.Background(), Capture{Action: "runner.started"})
+		done <- err
+	}()
+
+	select {
+	case err := <-done:
+		if err != nil {
+			t.Fatalf("Observe returned error: %v", err)
+		}
+	case <-time.After(250 * time.Millisecond):
+		t.Fatal("Observe blocked on sink")
+	}
+}
+
+func TestFlushDrainsQueuedCaptures(t *testing.T) {
+	sink := &captureEventSink{}
+	tracer, err := Start(Options{Harness: "my-agent", Sink: sink, BufferSize: 10})
+	if err != nil {
+		t.Fatalf("Start returned error: %v", err)
+	}
+	defer tracer.Shutdown(context.Background())
+
+	for i := 0; i < 3; i++ {
+		if _, err := tracer.Observe(context.Background(), Capture{Action: "runner.started"}); err != nil {
+			t.Fatalf("Observe returned error: %v", err)
+		}
+	}
+	if err := tracer.Flush(context.Background()); err != nil {
+		t.Fatalf("Flush returned error: %v", err)
+	}
+	events, _, _ := sink.snapshot()
+	if len(events) != 3 {
+		t.Fatalf("events = %d, want 3", len(events))
+	}
+}
+
+func TestConcurrentFlushBlockedOnFullBufferIsAckedDuringShutdown(t *testing.T) {
+	want := errors.New("flush barrier ran")
+	sink := newFirstWriteBlockingSink(want)
+	tracer, err := Start(Options{Harness: "my-agent", Sink: sink, BufferSize: 1})
+	if err != nil {
+		t.Fatalf("Start returned error: %v", err)
+	}
+
+	if _, err := tracer.Observe(context.Background(), Capture{Action: "runner.started"}); err != nil {
+		t.Fatalf("Observe returned error: %v", err)
+	}
+	select {
+	case <-sink.started:
+	case <-time.After(time.Second):
+		t.Fatal("sink write did not start")
+	}
+	if _, err := tracer.Observe(context.Background(), Capture{Action: "runner.started"}); err != nil {
+		t.Fatalf("Observe returned error: %v", err)
+	}
+
+	flushDone := make(chan error, 1)
+	go func() {
+		flushDone <- tracer.Flush(context.Background())
+	}()
+	for deadline := time.After(time.Second); tracer.pendingSends.Load() == 0; {
+		select {
+		case <-deadline:
+			t.Fatal("Flush did not block on the full command buffer")
+		default:
+			time.Sleep(time.Millisecond)
+		}
+	}
+
+	shutdownDone := make(chan error, 1)
+	go func() {
+		shutdownDone <- tracer.Shutdown(context.Background())
+	}()
+	close(sink.release)
+
+	select {
+	case err := <-flushDone:
+		if !errors.Is(err, want) {
+			t.Fatalf("Flush error = %v, want %v", err, want)
+		}
+	case <-time.After(time.Second):
+		t.Fatal("Flush was not acked during shutdown")
+	}
+	select {
+	case err := <-shutdownDone:
+		if !errors.Is(err, want) {
+			t.Fatalf("Shutdown error = %v, want %v", err, want)
+		}
+	case <-time.After(time.Second):
+		t.Fatal("Shutdown did not finish")
+	}
+	if sink.flushes() == 0 {
+		t.Fatal("sink Flush was not called")
+	}
+}
+
+func TestObserveDropsWhenBufferFull(t *testing.T) {
+	sink := newBlockingSink()
+	tracer, err := Start(Options{Harness: "my-agent", Sink: sink, BufferSize: 1})
+	if err != nil {
+		t.Fatalf("Start returned error: %v", err)
+	}
+	defer func() {
+		close(sink.release)
+		_ = tracer.Shutdown(context.Background())
+	}()
+
+	if _, err := tracer.Observe(context.Background(), Capture{Action: "runner.started"}); err != nil {
+		t.Fatalf("Observe returned error: %v", err)
+	}
+	select {
+	case <-sink.started:
+	case <-time.After(time.Second):
+		t.Fatal("sink write did not start")
+	}
+	var dropped int
+	for i := 0; i < 100; i++ {
+		result, err := tracer.Observe(context.Background(), Capture{Action: "runner.started"})
+		if err != nil && !errors.Is(err, ErrTracerClosed) {
+			t.Fatalf("Observe returned error: %v", err)
+		}
+		if result.Dropped {
+			dropped++
+		}
+	}
+	if dropped == 0 {
+		t.Fatal("expected at least one dropped capture")
+	}
+	if stats := tracer.Stats(); stats.Dropped == 0 {
+		t.Fatalf("Dropped stats = 0, want drops: %#v", stats)
+	}
+}
+
+func readJSONLEvents(t *testing.T, path string) []Event {
+	t.Helper()
+	data, err := os.ReadFile(path)
+	if err != nil {
+		t.Fatalf("read JSONL: %v", err)
+	}
+	var events []Event
+	for _, line := range strings.Split(strings.TrimSpace(string(data)), "\n") {
+		if line == "" {
+			continue
+		}
+		var event Event
+		if err := json.Unmarshal([]byte(line), &event); err != nil {
+			t.Fatalf("unmarshal JSONL event: %v line=%q", err, line)
+		}
+		events = append(events, event)
+	}
+	return events
+}
+
+func TestTracerPrivacyMetadataRemovesInputOutput(t *testing.T) {
+	sink := &captureEventSink{}
+	tracer, err := Start(Options{
+		Harness: "my-agent",
+		Sink:    sink,
+		Privacy: &PrivacyPolicy{Retention: ContentRetentionMetadata},
+	})
+	if err != nil {
+		t.Fatalf("Start returned error: %v", err)
+	}
+
+	_, err = tracer.Observe(context.Background(), Capture{
+		Action:   "runner.completed",
+		Category: "runner",
+		Input:    "SECRET input",
+		Output:   "SECRET output",
+	})
+	if err != nil {
+		t.Fatalf("Observe returned error: %v", err)
+	}
+	if err := tracer.Shutdown(context.Background()); err != nil {
+		t.Fatalf("Shutdown returned error: %v", err)
+	}
+
+	events, _, _ := sink.snapshot()
+	if len(events) != 1 {
+		t.Fatalf("events = %d, want 1", len(events))
+	}
+	rawText := stringify(events[0].Raw)
+	if strings.Contains(rawText, "SECRET") {
+		t.Fatalf("metadata privacy leaked input/output: %#v", events[0].Raw)
+	}
+	if events[0].Raw["field_count"] != 2 {
+		t.Fatalf("metadata raw = %#v, want field_count 2", events[0].Raw)
+	}
+}
+
+func TestTracerRecordsSinkErrorsInStats(t *testing.T) {
+	sink := &captureEventSink{writeErr: errors.New("write failed")}
+	tracer, err := Start(Options{Harness: "my-agent", Sink: sink})
+	if err != nil {
+		t.Fatalf("Start returned error: %v", err)
+	}
+
+	if _, err := tracer.Observe(context.Background(), Capture{Action: "runner.started"}); err != nil {
+		t.Fatalf("Observe returned error: %v", err)
+	}
+	if err := tracer.Flush(context.Background()); err != nil {
... diff truncated: showing 800 of 854 lines

You can send follow-ups to the cloud agent here.

Reviewed by Cursor Bugbot for commit 26ec6fe. Configure here.

Comment thread pkg/asymptotetrace/tracer.go Outdated
When t.done closes after beginCommandSend() succeeds but before the
barrier command is enqueued, sendBarrier now returns t.shutdownErr (or
ErrTracerClosed) instead of nil, matching the error propagation pattern
used in the other t.done branches of the same function.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants