Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 122 additions & 4 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cogito

import (
"context"
"errors"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -48,6 +49,51 @@ type AgentDefinition struct {
MaxRetries int // optional per-type retry cap (0 = inherit parent)
}

// AgentRunSpec is a portable, self-contained description of a single sub-agent
// run. It carries everything an out-of-process executor needs to reproduce the
// run that cogito would otherwise perform in-process via ExecuteTools. cogito
// still owns all lifecycle bookkeeping (registration, status, done channel,
// callbacks, completion injection, detach) regardless of where execution
// happens — the spec is only the execution payload.
type AgentRunSpec struct {
ID string // registry ID assigned by cogito
Type string // requested agent type name (empty for generic)
Task string // the user task driving the sub-agent
SystemPrompt string // resolved system prompt (from definition or empty)
Model string // resolved model override (may be empty)
Temperature float32 // resolved sampling temperature (may be 0)
Metadata map[string]string // per-request metadata (may be nil)
Tools []string // tool-name allow-list for this run
Background bool // whether spawned in the background
// Emit, when non-nil, lets the executor stream progress back to the
// embedder. cogito wires this to forward tagged sub-agent StreamEvents to
// the parent stream callback. It is safe to ignore.
Emit func(AgentEvent)
}

// AgentEvent is a transport-friendly progress event emitted by an out-of-process
// executor through AgentRunSpec.Emit. cogito translates it into a tagged
// StreamEvent for the parent stream callback.
type AgentEvent struct {
AgentID string // the sub-agent's registry ID
Kind string // one of: running | delta | done | error
Delta string // incremental text (for kind=delta)
Result string // terminal result text (for kind=done)
Err string // error message (for kind=error)
}

// AgentDispatcher is the execution seam: when set via WithAgentDispatcher,
// cogito calls it instead of running the sub-agent in-process. It must return
// the sub-agent's final Fragment (whose last message content becomes the
// agent's Result). Returning ErrDispatchFallback makes cogito transparently
// fall back to the in-process ExecuteTools path. Any other error marks the
// agent failed. The context governs the sub-agent's lifetime.
type AgentDispatcher func(ctx context.Context, spec AgentRunSpec) (Fragment, error)

// ErrDispatchFallback signals that an AgentDispatcher declined to handle a run
// and cogito should execute it in-process instead.
var ErrDispatchFallback = errors.New("cogito: dispatch fallback to in-process")

// findAgentDefinition returns the definition with the given name, or nil.
func findAgentDefinition(defs []AgentDefinition, name string) *AgentDefinition {
for i := range defs {
Expand Down Expand Up @@ -300,6 +346,59 @@ type spawnAgentRunner struct {
completionFormatter func(*AgentState) string
agentDefinitions []AgentDefinition
llmFactory func(model string, temperature float32, metadata map[string]string) LLM
// dispatcher, when non-nil, executes the sub-agent out-of-process instead
// of via in-process ExecuteTools. cogito retains all lifecycle bookkeeping
// either way (see runAgent).
dispatcher AgentDispatcher
}

// buildRunSpec assembles a self-contained AgentRunSpec for a sub-agent run from
// the resolved definition (if any), the spawn args, the resolved tool
// allow-list, and the assigned registry ID. The Emit closure forwards executor
// progress to the parent stream callback as tagged sub-agent StreamEvents.
func (r *spawnAgentRunner) buildRunSpec(args SpawnAgentArgs, def *AgentDefinition, subTools Tools, agentID string) AgentRunSpec {
spec := AgentRunSpec{
ID: agentID,
Type: args.AgentType,
Task: args.Task,
Tools: subTools.Names(),
Background: args.Background,
Model: args.Model,
}
if def != nil {
spec.SystemPrompt = def.SystemPrompt
if spec.Model == "" {
spec.Model = def.Model
}
spec.Temperature = def.Temperature
spec.Metadata = def.Metadata
}

if r.streamCB != nil {
parentCB := r.streamCB
spec.Emit = func(ev AgentEvent) {
id := ev.AgentID
if id == "" {
id = agentID
}
se := StreamEvent{
AgentID: id,
Type: StreamEventSubAgent,
}
switch ev.Kind {
case "delta":
se.Content = ev.Delta
case "done":
se.Content = ev.Result
se.FinishReason = "stop"
case "error":
se.Error = errors.New(ev.Err)
}
parentCB(se)
}
}

return spec
}

func (r *spawnAgentRunner) Run(args SpawnAgentArgs) (string, any, error) {
Expand Down Expand Up @@ -355,6 +454,10 @@ func (r *spawnAgentRunner) Run(args SpawnAgentArgs) (string, any, error) {
subLLM := r.resolveLLM(args, def)

agentID := uuid.New().String()
// Portable execution payload, used by an out-of-process dispatcher if one
// is configured. Built once here while def/args/subTools are in scope and
// shared by both the foreground and background branches.
runSpec := r.buildRunSpec(args, def, subTools, agentID)
// Decouple the sub-agent's lifetime from the parent turn's cancellation:
// once detached (or spawned in the background) the agent must keep running
// after the parent ExecuteTools call returns and the embedder cancels its
Expand Down Expand Up @@ -391,7 +494,7 @@ func (r *spawnAgentRunner) Run(args SpawnAgentArgs) (string, any, error) {
fgOpts = append(fgOpts, WithStreamCallback(r.streamCB))
}

go r.runAgent(agent, subLLM, subFragment, fgOpts, cancel)
go r.runAgent(agent, subLLM, subFragment, fgOpts, runSpec, subCtx, cancel)

select {
case <-agent.done:
Expand Down Expand Up @@ -448,7 +551,7 @@ func (r *spawnAgentRunner) Run(args SpawnAgentArgs) (string, any, error) {
// Override context for sub-agent.
bgOpts = append(bgOpts, WithContext(subCtx))

go r.runAgent(agent, subLLM, subFragment, bgOpts, cancel)
go r.runAgent(agent, subLLM, subFragment, bgOpts, runSpec, subCtx, cancel)

return fmt.Sprintf("Agent spawned in background with ID: %s", agentID), agentID, nil
}
Expand All @@ -457,11 +560,24 @@ func (r *spawnAgentRunner) Run(args SpawnAgentArgs) (string, any, error) {
// firing the completion callback and injecting a completion notification into
// the parent loop. Shared by the foreground (detachable) and background spawn
// branches so the lifecycle bookkeeping lives in one place.
func (r *spawnAgentRunner) runAgent(agent *AgentState, llm LLM, frag Fragment, opts []Option, cancel context.CancelFunc) {
func (r *spawnAgentRunner) runAgent(agent *AgentState, llm LLM, frag Fragment, opts []Option, spec AgentRunSpec, ctx context.Context, cancel context.CancelFunc) {
defer close(agent.done)
defer cancel()

result, err := ExecuteTools(llm, frag, opts...)
var (
result Fragment
err error
)
if r.dispatcher != nil {
// Out-of-process execution. The dispatcher governs execution but cogito
// still owns all lifecycle bookkeeping below.
result, err = r.dispatcher(ctx, spec)
if errors.Is(err, ErrDispatchFallback) {
result, err = ExecuteTools(llm, frag, opts...)
}
} else {
result, err = ExecuteTools(llm, frag, opts...)
}

r.manager.mu.Lock()
if err != nil {
Expand Down Expand Up @@ -593,6 +709,7 @@ func newSpawnAgentTool(
completionFormatter func(*AgentState) string,
defs []AgentDefinition,
llmFactory func(model string, temperature float32, metadata map[string]string) LLM,
dispatcher AgentDispatcher,
) ToolDefinitionInterface {
return NewToolDefinition(
&spawnAgentRunner{
Expand All @@ -608,6 +725,7 @@ func newSpawnAgentTool(
completionFormatter: completionFormatter,
agentDefinitions: defs,
llmFactory: llmFactory,
dispatcher: dispatcher,
},
SpawnAgentArgs{},
"spawn_agent",
Expand Down
Loading
Loading