Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions agent_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,12 @@ func TestExecuteTools_ParksWhilePendingWork(t *testing.T) {
}

// TestExecuteTools_OnParkOnResumeFire proves that WithOnPark fires immediately
// before the loop blocks on the injection channel at a park gate, and
// WithOnResume fires immediately after an injected message wakes it — and that
// onPark is observed before onResume. While parked, onPark must have fired but
// onResume must NOT yet; after the predicate flips false and a message is
// injected, the loop resumes/returns and onResume must have fired.
// before the loop blocks on the injection channel at a park gate — carrying the
// assistant reply text that preceded the park — and WithOnResume fires
// immediately after an injected message wakes it, with onPark observed before
// onResume. While parked, onPark must have fired but onResume must NOT yet;
// after the predicate flips false and a message is injected, the loop
// resumes/returns and onResume must have fired.
func TestExecuteTools_OnParkOnResumeFire(t *testing.T) {
ch := make(chan openai.ChatCompletionMessage, 1)
var pending atomic.Bool
Expand All @@ -162,12 +163,16 @@ func TestExecuteTools_OnParkOnResumeFire(t *testing.T) {
// whether onPark had already fired — used to assert ordering.
var parkBeforeResume atomic.Bool
parkBeforeResume.Store(true)
var parkReply atomic.Value // first parked reply text
done := make(chan struct{})
go func() {
_, _ = ExecuteTools(noToolMockLLM{}, NewEmptyFragment().AddMessage("user", "hi"),
WithMessageInjectionChan(ch),
WithPendingWork(func() bool { return pending.Load() }),
WithOnPark(func() { parks.Add(1) }),
WithOnPark(func(reply string) {
parkReply.CompareAndSwap(nil, reply)
parks.Add(1)
}),
WithOnResume(func() {
if parks.Load() == 0 {
parkBeforeResume.Store(false)
Expand Down Expand Up @@ -206,4 +211,9 @@ func TestExecuteTools_OnParkOnResumeFire(t *testing.T) {
if !parkBeforeResume.Load() {
t.Fatal("onPark must fire before onResume")
}
// The park gate must carry the no-tool reply the model produced right
// before blocking — the embedder surfaces it as the parked reply.
if got, _ := parkReply.Load().(string); got != "sub-agent done" {
t.Fatalf("onPark reply = %q, want %q", got, "sub-agent done")
}
}
17 changes: 11 additions & 6 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ type Options struct {
pendingWork func() bool

// onPark, when set, fires immediately before the loop blocks on the
// message-injection channel at a park gate. onResume, when set, fires
// immediately after an injected message wakes the loop at a park gate.
onPark func()
// message-injection channel at a park gate; it receives the assistant
// reply text that preceded the park ("" when the model produced none).
// onResume, when set, fires immediately after an injected message wakes
// the loop at a park gate.
onPark func(reply string)
onResume func()

// TODO-based iterative execution options
Expand Down Expand Up @@ -427,12 +429,15 @@ func WithPendingWork(fn func() bool) Option { return func(o *Options) { o.pendin
// WithOnPark registers a callback fired immediately BEFORE the loop blocks on
// the message-injection channel at a park gate (i.e. when background work —
// cogito's own running agents or an embedder's WithPendingWork predicate — is
// still pending). An embedder can use this to finalize the current assistant
// turn the instant the loop parks.
// still pending). The callback receives the assistant reply text that preceded
// the park — the no-tool text reply recorded in the fragment just before the
// loop blocked — or "" when the model produced none (e.g. a sink-state park).
// An embedder can use this to surface the parked reply and finalize the
// current assistant turn the instant the loop parks.
//
// Across a single run the loop may park and resume multiple times (e.g. several
// injected messages), so onPark may fire multiple times — that is expected.
func WithOnPark(fn func()) Option { return func(o *Options) { o.onPark = fn } }
func WithOnPark(fn func(reply string)) Option { return func(o *Options) { o.onPark = fn } }

// WithOnResume registers a callback fired immediately AFTER an injected message
// wakes the loop at a park gate (the resume path). It does NOT fire when the
Expand Down
8 changes: 6 additions & 2 deletions tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -1442,7 +1442,9 @@ TOOL_LOOP:
if (o.agentManager != nil && o.agentManager.HasRunning()) || (o.pendingWork != nil && o.pendingWork()) {
xlog.Debug("No tool selected but background agents still running, blocking for completions")
if o.onPark != nil {
o.onPark()
// reasoning holds the no-tool text reply recorded in the
// fragment above — the parked reply the embedder surfaces.
o.onPark(reasoning)
}
select {
case <-o.context.Done():
Expand Down Expand Up @@ -1557,7 +1559,9 @@ TOOL_LOOP:
xlog.Debug("Sink state selected but background agents still running, blocking for completions")
hasSinkState = false // Reset so we re-enter the loop
if o.onPark != nil {
o.onPark()
// Sink-state park: the reply is produced by the sink state
// after the loop, so there is no parked reply text yet.
o.onPark("")
}
select {
case <-o.context.Done():
Expand Down
Loading