From c76628b2a650ead570a4e08d73c4d8dd7092725a Mon Sep 17 00:00:00 2001 From: Lightblues Date: Sat, 17 Jan 2026 16:39:33 +0800 Subject: [PATCH 1/2] upgrade to openai-agent=0.6.4 --- pyproject.toml | 2 +- uv.lock | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 53fff5e0..4f040e66 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,7 @@ dependencies = [ "jinja2>=3.1.6", "mcp>=1.12.3", # https://github.com/openai/openai-agents-python/issues/1395 | openai==1.97.1 & openai-agents==0.2.5 - "openai-agents==0.5.0", + "openai-agents==0.6.4", # https://github.com/Arize-ai/openinference/tree/main/python/instrumentation/openinference-instrumentation-openai "openinference-instrumentation-openai>=0.1.30", # https://opentelemetry.io/docs/languages/python/ diff --git a/uv.lock b/uv.lock index ecd4f2fa..c5b754b1 100644 --- a/uv.lock +++ b/uv.lock @@ -3553,7 +3553,7 @@ wheels = [ [[package]] name = "openai" -version = "2.7.1" +version = "2.15.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "anyio" }, @@ -3565,14 +3565,14 @@ dependencies = [ { name = "tqdm" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/51/a2/f4023c1e0c868a6a5854955b3374f17153388aed95e835af114a17eac95b/openai-2.7.1.tar.gz", hash = "sha256:df4d4a3622b2df3475ead8eb0fbb3c27fd1c070fa2e55d778ca4f40e0186c726", size = 595933, upload-time = "2025-11-04T06:07:23.069Z" } +sdist = { url = "https://files.pythonhosted.org/packages/94/f4/4690ecb5d70023ce6bfcfeabfe717020f654bde59a775058ec6ac4692463/openai-2.15.0.tar.gz", hash = "sha256:42eb8cbb407d84770633f31bf727d4ffb4138711c670565a41663d9439174fba", size = 627383, upload-time = "2026-01-09T22:10:08.603Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/8c/74/6bfc3adc81f6c2cea4439f2a734c40e3a420703bbcdc539890096a732bbd/openai-2.7.1-py3-none-any.whl", hash = "sha256:2f2530354d94c59c614645a4662b9dab0a5b881c5cd767a8587398feac0c9021", size = 1008780, upload-time = "2025-11-04T06:07:20.818Z" }, + { url = "https://files.pythonhosted.org/packages/b5/df/c306f7375d42bafb379934c2df4c2fa3964656c8c782bac75ee10c102818/openai-2.15.0-py3-none-any.whl", hash = "sha256:6ae23b932cd7230f7244e52954daa6602716d6b9bf235401a107af731baea6c3", size = 1067879, upload-time = "2026-01-09T22:10:06.446Z" }, ] [[package]] name = "openai-agents" -version = "0.5.0" +version = "0.6.4" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "griffe" }, @@ -3583,9 +3583,9 @@ dependencies = [ { name = "types-requests" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/00/b3/c25c3b3084c113ffbd161c37ed7c02e0cc1296eb2f2d582d85c461f60c7a/openai_agents-0.5.0.tar.gz", hash = "sha256:776dde4025442164e3e860ff5b239b5c0ebc30f9445b0d75295c385a8ca1f696", size = 1958702, upload-time = "2025-11-05T05:28:37.456Z" } +sdist = { url = "https://files.pythonhosted.org/packages/13/36/826ce8ad497904a1becdedef80326c2fe932c754646b77405a8dc4cd49f7/openai_agents-0.6.4.tar.gz", hash = "sha256:07836865ed9c37946523d44b2d87ad375673b6558e783fa086db004a892331ec", size = 2022961, upload-time = "2025-12-19T06:42:55.356Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/d5/f5/c43a84a64aa3328c628cc19365dc514ce02abf31e31c861ad489d6d3075b/openai_agents-0.5.0-py3-none-any.whl", hash = "sha256:5ef062273815de197315ec760f571625d0f2766ceb83ab189ba6cdd9b26a10e9", size = 223272, upload-time = "2025-11-05T05:28:35.64Z" }, + { url = "https://files.pythonhosted.org/packages/2b/ed/f2282debf62c52241959b111d2e35105b43b2ea6ff060833734405bb4d0f/openai_agents-0.6.4-py3-none-any.whl", hash = "sha256:d14635c1fa0ee39e79b81e5cab2f22dd5024772d3dbc0770d8307fd2548b3951", size = 241982, upload-time = "2025-12-19T06:42:53.92Z" }, ] [[package]] @@ -7075,7 +7075,7 @@ requires-dist = [ { name = "matplotlib", marker = "extra == 'local-python'", specifier = ">=3.10.6" }, { name = "matplotlib", marker = "extra == 'ppt-gen'", specifier = ">=3.10.6" }, { name = "mcp", specifier = ">=1.12.3" }, - { name = "openai-agents", specifier = "==0.5.0" }, + { name = "openai-agents", specifier = "==0.6.4" }, { name = "openinference-instrumentation-openai", specifier = ">=0.1.30" }, { name = "openpyxl", marker = "extra == 'documents'", specifier = ">=3.1.5" }, { name = "opentelemetry-exporter-otlp", specifier = ">=1.37.0" }, From 4daa083cb491eb788f613684b737039cd93ed14a Mon Sep 17 00:00:00 2001 From: Lightblues Date: Sat, 17 Jan 2026 16:42:21 +0800 Subject: [PATCH 2/2] feat: add customized agent-runner --- docs/configs/agent_simple.yaml | 2 + utu/__init__.py | 3 - utu/agents/simple_agent.py | 12 +- utu/config/agent_config.py | 2 + utu/runner/__init__.py | 27 + .../runner.py => runner/openai_runner.py} | 134 ++-- utu/runner/react_runner.py | 596 ++++++++++++++++++ 7 files changed, 686 insertions(+), 90 deletions(-) create mode 100644 utu/runner/__init__.py rename utu/{patch/runner.py => runner/openai_runner.py} (85%) create mode 100644 utu/runner/react_runner.py diff --git a/docs/configs/agent_simple.yaml b/docs/configs/agent_simple.yaml index c3b07d38..47561f04 100644 --- a/docs/configs/agent_simple.yaml +++ b/docs/configs/agent_simple.yaml @@ -9,6 +9,8 @@ type: simple # agent type, support `simple | orchestrator` for now max_turns: 50 # max number of turns +runner: openai # `openai | react` you can choose customized agent Runner + # ------------------------------------------------------------------------------------------- # model configs model: diff --git a/utu/__init__.py b/utu/__init__.py index be861347..c6458783 100644 --- a/utu/__init__.py +++ b/utu/__init__.py @@ -2,11 +2,8 @@ from agents.run import set_default_agent_runner from .utils import EnvUtils, setup_logging -from .patch.runner import UTUAgentRunner from .tracing import setup_tracing EnvUtils.assert_env(["UTU_LLM_TYPE", "UTU_LLM_MODEL"]) setup_logging(EnvUtils.get_env("UTU_LOG_LEVEL", "WARNING")) setup_tracing() -# patched runner -set_default_agent_runner(UTUAgentRunner()) diff --git a/utu/agents/simple_agent.py b/utu/agents/simple_agent.py index 5f67cbde..553851a1 100644 --- a/utu/agents/simple_agent.py +++ b/utu/agents/simple_agent.py @@ -11,7 +11,6 @@ ModelSettings, RunConfig, RunHooks, - Runner, StopAtTools, TContext, Tool, @@ -25,6 +24,7 @@ from ..db import DBService, TrajectoryModel from ..env import _BaseEnv, get_env from ..hooks import get_run_hooks +from ..runner import get_runner from ..tools import TOOLKIT_MAP, AsyncBaseToolkit from ..utils import AgentsMCPUtils, AgentsUtils, get_logger, load_class_from_file from .common import QueueCompleteSentinel, TaskRecorder @@ -260,11 +260,12 @@ async def run( if isinstance(input, str): # only add history when input is str? input = self.input_items + [{"content": input, "role": "user"}] run_kwargs = self._prepare_run_kwargs(input) + runner = get_runner(self.config.runner) if AgentsUtils.get_current_trace(): - run_result = await Runner.run(**run_kwargs) + run_result = await runner.run(**run_kwargs) else: with trace(workflow_name="simple_agent", trace_id=recorder.trace_id): - run_result = await Runner.run(**run_kwargs) + run_result = await runner.run(**run_kwargs) # save final output and trajectory recorder.add_run_result(run_result) if save: @@ -304,11 +305,12 @@ async def _start_streaming(self, recorder: TaskRecorder, save: bool = False, log if isinstance(input, str): # only add history when input is str? input = self.input_items + [{"content": input, "role": "user"}] run_kwargs = self._prepare_run_kwargs(input) + runner = get_runner(self.config.runner) if AgentsUtils.get_current_trace(): - run_streamed_result = Runner.run_streamed(**run_kwargs) + run_streamed_result = runner.run_streamed(**run_kwargs) else: with trace(workflow_name="simple_agent", trace_id=recorder.trace_id): - run_streamed_result = Runner.run_streamed(**run_kwargs) + run_streamed_result = runner.run_streamed(**run_kwargs) async for event in run_streamed_result.stream_events(): recorder._event_queue.put_nowait(event) # save final output and trajectory diff --git a/utu/config/agent_config.py b/utu/config/agent_config.py index b804b790..e066ea16 100644 --- a/utu/config/agent_config.py +++ b/utu/config/agent_config.py @@ -70,6 +70,8 @@ class AgentConfig(ConfigBaseModel): """Max turns for simple agent. This param is derived from @openai-agents""" stop_at_tool_names: list[str] | None = None """Stop at tools for simple agent. This param is derived from @openai-agents""" + runner: Literal["openai", "react"] = "openai" + """Runner name for simple agent.""" # orchestra agent config planner_model: ModelConfigs = Field(default_factory=ModelConfigs) diff --git a/utu/runner/__init__.py b/utu/runner/__init__.py new file mode 100644 index 00000000..bbf0a85e --- /dev/null +++ b/utu/runner/__init__.py @@ -0,0 +1,27 @@ +from typing import Literal + +from agents import RunConfig, Runner + +from .openai_runner import UTUAgentRunner +from .react_runner import ReactRunner + + +def get_runner(name: Literal["openai", "react"] = "openai") -> object: + """Get a runner class by name. + + Args: + name: Runner name ("openai" for default, "react" for ReactRunner) + + Returns: + Runner class (not instance) + """ + # TODO: add a protocol for runner + if name == "react": + return ReactRunner + elif name == "openai": + return UTUAgentRunner() + else: + raise ValueError(f"Unknown runner name: {name}") + + +__all__ = ["Runner", "RunConfig", "ReactRunner", "get_runner"] diff --git a/utu/patch/runner.py b/utu/runner/openai_runner.py similarity index 85% rename from utu/patch/runner.py rename to utu/runner/openai_runner.py index aa9601e9..a4604c8b 100644 --- a/utu/patch/runner.py +++ b/utu/runner/openai_runner.py @@ -7,8 +7,7 @@ import asyncio import logging -from typing import cast -from typing_extensions import Unpack +from typing import Unpack, cast from agents import ( Agent, @@ -22,22 +21,33 @@ TResponseInputItem, ) from agents._run_impl import ( - RunImpl, NextStepHandoff, TraceCtxManager, NextStepFinalOutput, NextStepRunAgain, - get_model_tracing_impl, + NextStepFinalOutput, + NextStepHandoff, + NextStepRunAgain, + RunImpl, + TraceCtxManager, + get_model_tracing_impl, ) -from agents.exceptions import ModelBehaviorError, MaxTurnsExceeded, AgentsException, RunErrorDetails +from agents.exceptions import AgentsException, MaxTurnsExceeded, ModelBehaviorError, RunErrorDetails from agents.guardrail import InputGuardrailResult -from agents.items import HandoffCallItem, ModelResponse, ToolCallItem, ToolCallItemTypes, ReasoningItem +from agents.items import HandoffCallItem, ModelResponse, ReasoningItem, ToolCallItem, ToolCallItemTypes from agents.result import RunResult from agents.run import ( - AgentRunner, AgentToolUseTracker, RunResultStreaming, SingleStepResult, RunOptions, - _TOOL_CALL_TYPES, _ServerConversationTracker, _copy_str_or_list, DEFAULT_MAX_TURNS + _TOOL_CALL_TYPES, + DEFAULT_MAX_TURNS, + AgentRunner, + AgentToolUseTracker, + RunOptions, + RunResultStreaming, + SingleStepResult, + _copy_str_or_list, + _ServerConversationTracker, ) from agents.stream_events import RawResponsesStreamEvent, RunItemStreamEvent from agents.tool_guardrails import ToolInputGuardrailResult, ToolOutputGuardrailResult -from agents.tracing import Span, AgentSpanData, SpanError, agent_span -from agents.util import _coro, _error_tracing +from agents.tracing import AgentSpanData, Span, SpanError, agent_span from agents.usage import Usage +from agents.util import _coro, _error_tracing from openai.types.responses import ( ResponseCompletedEvent, ResponseFunctionToolCall, @@ -53,7 +63,6 @@ class UTUAgentRunner(AgentRunner): - async def run( self, starting_agent: Agent[TContext], @@ -79,9 +88,7 @@ async def run( # Keep original user input separate from session-prepared input original_user_input = input - prepared_input = await self._prepare_input_with_session( - input, session, run_config.session_input_callback - ) + prepared_input = await self._prepare_input_with_session(input, session, run_config.session_input_callback) tool_use_tracker = AgentToolUseTracker() @@ -120,8 +127,7 @@ async def run( # agent changes, or if the agent loop ends. if current_span is None: handoff_names = [ - h.agent_name - for h in await AgentRunner._get_handoffs(current_agent, context_wrapper) + h.agent_name for h in await AgentRunner._get_handoffs(current_agent, context_wrapper) ] if output_schema := AgentRunner._get_output_schema(current_agent): output_type_name = output_schema.name() @@ -152,17 +158,18 @@ async def run( ) # ADD: inject context infos if isinstance(context_wrapper.context, dict): - context_wrapper.context.update({ - "current_turn": current_turn, - "max_turns": max_turns, - }) + context_wrapper.context.update( + { + "current_turn": current_turn, + "max_turns": max_turns, + } + ) if current_turn == 1: input_guardrail_results, turn_result = await asyncio.gather( self._run_input_guardrails( starting_agent, - starting_agent.input_guardrails - + (run_config.input_guardrails or []), + starting_agent.input_guardrails + (run_config.input_guardrails or []), _copy_str_or_list(prepared_input), context_wrapper, ), @@ -225,12 +232,9 @@ async def run( context_wrapper=context_wrapper, ) if not any( - guardrail_result.output.tripwire_triggered - for guardrail_result in input_guardrail_results + guardrail_result.output.tripwire_triggered for guardrail_result in input_guardrail_results ): - await self._save_result_to_session( - session, [], turn_result.new_step_items - ) + await self._save_result_to_session(session, [], turn_result.new_step_items) return result elif isinstance(turn_result.next_step, NextStepHandoff): @@ -240,16 +244,11 @@ async def run( should_run_agent_start_hooks = True elif isinstance(turn_result.next_step, NextStepRunAgain): if not any( - guardrail_result.output.tripwire_triggered - for guardrail_result in input_guardrail_results + guardrail_result.output.tripwire_triggered for guardrail_result in input_guardrail_results ): - await self._save_result_to_session( - session, [], turn_result.new_step_items - ) + await self._save_result_to_session(session, [], turn_result.new_step_items) else: - raise AgentsException( - f"Unknown next step type: {type(turn_result.next_step)}" - ) + raise AgentsException(f"Unknown next step type: {type(turn_result.next_step)}") except AgentsException as exc: exc.run_data = RunErrorDetails( input=original_input, @@ -284,11 +283,7 @@ async def _run_single_turn_streamed( if should_run_agent_start_hooks: await asyncio.gather( hooks.on_agent_start(context_wrapper, agent), - ( - agent.hooks.on_start(context_wrapper, agent) - if agent.hooks - else _coro.noop_coroutine() - ), + (agent.hooks.on_start(context_wrapper, agent) if agent.hooks else _coro.noop_coroutine()), ) output_schema = cls._get_output_schema(agent) @@ -309,20 +304,20 @@ async def _run_single_turn_streamed( final_response: ModelResponse | None = None if server_conversation_tracker is not None: - input = server_conversation_tracker.prepare_input( - streamed_result.input, streamed_result.new_items - ) + input = server_conversation_tracker.prepare_input(streamed_result.input, streamed_result.new_items) else: input = ItemHelpers.input_to_new_input_list(streamed_result.input) input.extend([item.to_input_item() for item in streamed_result.new_items]) # ADD: inject context infos if isinstance(context_wrapper.context, dict): - context_wrapper.context.update({ - "current_turn": streamed_result.current_turn, - "max_turns": streamed_result.max_turns, - # "streamed_result": streamed_result - }) + context_wrapper.context.update( + { + "current_turn": streamed_result.current_turn, + "max_turns": streamed_result.max_turns, + # "streamed_result": streamed_result + } + ) input = cls._context_manager_preprocess(input, context_wrapper) # THIS IS THE RESOLVED CONFLICT BLOCK @@ -338,22 +333,14 @@ async def _run_single_turn_streamed( await asyncio.gather( hooks.on_llm_start(context_wrapper, agent, filtered.instructions, filtered.input), ( - agent.hooks.on_llm_start( - context_wrapper, agent, filtered.instructions, filtered.input - ) + agent.hooks.on_llm_start(context_wrapper, agent, filtered.instructions, filtered.input) if agent.hooks else _coro.noop_coroutine() ), ) - previous_response_id = ( - server_conversation_tracker.previous_response_id - if server_conversation_tracker - else None - ) - conversation_id = ( - server_conversation_tracker.conversation_id if server_conversation_tracker else None - ) + previous_response_id = server_conversation_tracker.previous_response_id if server_conversation_tracker else None + conversation_id = server_conversation_tracker.conversation_id if server_conversation_tracker else None # 1. Stream the output events async for event in model.stream_response( @@ -363,9 +350,7 @@ async def _run_single_turn_streamed( all_tools, output_schema, handoffs, - get_model_tracing_impl( - run_config.tracing_disabled, run_config.trace_include_sensitive_data - ), + get_model_tracing_impl(run_config.tracing_disabled, run_config.trace_include_sensitive_data), previous_response_id=previous_response_id, conversation_id=conversation_id, prompt=prompt_config, @@ -397,9 +382,7 @@ async def _run_single_turn_streamed( output_item = event.item if isinstance(output_item, _TOOL_CALL_TYPES): - call_id: str | None = getattr( - output_item, "call_id", getattr(output_item, "id", None) - ) + call_id: str | None = getattr(output_item, "call_id", getattr(output_item, "id", None)) if call_id and call_id not in emitted_tool_call_ids: emitted_tool_call_ids.add(call_id) @@ -408,9 +391,7 @@ async def _run_single_turn_streamed( raw_item=cast(ToolCallItemTypes, output_item), agent=agent, ) - streamed_result._event_queue.put_nowait( - RunItemStreamEvent(item=tool_item, name="tool_called") - ) + streamed_result._event_queue.put_nowait(RunItemStreamEvent(item=tool_item, name="tool_called")) elif isinstance(output_item, ResponseReasoningItem): reasoning_id: str | None = getattr(output_item, "id", None) @@ -469,11 +450,7 @@ async def _run_single_turn_streamed( for item in items_to_filter if not ( isinstance(item, ToolCallItem) - and ( - call_id := getattr( - item.raw_item, "call_id", getattr(item.raw_item, "id", None) - ) - ) + and (call_id := getattr(item.raw_item, "call_id", getattr(item.raw_item, "id", None))) and call_id in emitted_tool_call_ids ) ] @@ -491,9 +468,7 @@ async def _run_single_turn_streamed( ] # Filter out HandoffCallItem to avoid duplicates (already sent earlier) - items_to_filter = [ - item for item in items_to_filter if not isinstance(item, HandoffCallItem) - ] + items_to_filter = [item for item in items_to_filter if not isinstance(item, HandoffCallItem)] # Create filtered result and send to queue filtered_result = _dc.replace(single_step_result, new_step_items=items_to_filter) @@ -519,11 +494,7 @@ async def _run_single_turn( if should_run_agent_start_hooks: await asyncio.gather( hooks.on_agent_start(context_wrapper, agent), - ( - agent.hooks.on_start(context_wrapper, agent) - if agent.hooks - else _coro.noop_coroutine() - ), + (agent.hooks.on_start(context_wrapper, agent) if agent.hooks else _coro.noop_coroutine()), ) system_prompt, prompt_config = await asyncio.gather( @@ -574,7 +545,6 @@ async def _run_single_turn( tool_use_tracker=tool_use_tracker, ) - @classmethod def _context_manager_preprocess( cls, input: list[TResponseInputItem], context_wrapper: RunContextWrapper[TContext] diff --git a/utu/runner/react_runner.py b/utu/runner/react_runner.py new file mode 100644 index 00000000..293d47ca --- /dev/null +++ b/utu/runner/react_runner.py @@ -0,0 +1,596 @@ +"""ReactRunner: Custom agent runner implementing the ReAct (Reasoning-Acting) pattern. + +This module provides a custom agent execution loop that offers more flexibility than the +default openai-agents Runner, including configurable max-turns handling and future +extensibility for context management. +""" + +from __future__ import annotations + +import asyncio +import inspect +from dataclasses import dataclass +from typing import TYPE_CHECKING, Literal, cast + +from agents import Agent, RunConfig, RunResultStreaming, Session +from agents._run_impl import ( + AgentToolUseTracker, + NextStepFinalOutput, + NextStepHandoff, + NextStepRunAgain, + QueueCompleteSentinel, + RunImpl, + SingleStepResult, + get_model_tracing_impl, +) +from agents.agent_output import AgentOutputSchema, AgentOutputSchemaBase +from agents.exceptions import ModelBehaviorError, UserError +from agents.handoffs import Handoff, handoff +from agents.items import HandoffCallItem, ItemHelpers, ModelResponse, RunItem, TResponseInputItem +from agents.lifecycle import RunHooks +from agents.models.interface import Model +from agents.run import DEFAULT_MAX_TURNS, CallModelData, ModelInputData +from agents.run_context import RunContextWrapper, TContext +from agents.stream_events import AgentUpdatedStreamEvent, RawResponsesStreamEvent +from agents.tool import Tool +from agents.tracing import SpanError, agent_span, get_current_trace, trace +from agents.tracing.span_data import AgentSpanData +from agents.usage import Usage +from agents.util import _coro, _error_tracing +from openai.types.responses import ResponseCompletedEvent + +if TYPE_CHECKING: + from agents.tracing import Span + + +@dataclass +class ReactRunnerConfig: + """Configuration options for ReactRunner.""" + + max_turns: int = DEFAULT_MAX_TURNS + on_max_turns: Literal["error", "reply"] = "error" + """Behavior when max turns is exceeded: + - "error": Raise MaxTurnsExceeded (default, SDK-compatible) + - "reply": Remove tools and generate a final reply + """ + + +class ReactRunner: + """Custom agent runner implementing the ReAct (Reasoning-Acting) pattern. + + This runner provides: + - Streaming-only execution via `run_streamed` + - Full hooks integration for lifecycle callbacks + - Tracing integration with openai-agents + - Configurable max-turns handling + - Core ReAct loop: think → act → observe → repeat + + Usage: + result = ReactRunner.run_streamed(agent, input, max_turns=10) + async for event in result.stream_events(): + process(event) + """ + + @classmethod + def run_streamed( + cls, + starting_agent: Agent[TContext], + input: str | list[TResponseInputItem], + context: TContext | None = None, + max_turns: int = DEFAULT_MAX_TURNS, + hooks: RunHooks[TContext] | None = None, + run_config: RunConfig | None = None, + session: Session | None = None, + **kwargs, + ) -> RunResultStreaming: + """Run an agent with streaming output. + + Args: + starting_agent: The agent to run. + input: User input (string or list of input items). + context: Optional context object passed to tools and hooks. + max_turns: Maximum number of turns before stopping. + hooks: Lifecycle hooks for the run. + run_config: Run configuration (tracing, model settings, etc). + session: Optional session for conversation history persistence. + + Returns: + RunResultStreaming object that can be used to stream events. + """ + print(">> using ReactRunner") + if run_config is None: + run_config = RunConfig() + if hooks is None: + hooks = RunHooks[TContext]() + + # Create trace if not already in one + new_trace = ( + None + if get_current_trace() + else trace( + workflow_name=run_config.workflow_name, + trace_id=run_config.trace_id, + group_id=run_config.group_id, + metadata=run_config.trace_metadata, + disabled=run_config.tracing_disabled, + ) + ) + + output_schema = cls._get_output_schema(starting_agent) + context_wrapper: RunContextWrapper[TContext] = RunContextWrapper(context=context) + + streamed_result = RunResultStreaming( + input=cls._copy_input(input), + new_items=[], + current_agent=starting_agent, + raw_responses=[], + final_output=None, + is_complete=False, + current_turn=0, + max_turns=max_turns, + input_guardrail_results=[], + output_guardrail_results=[], + tool_input_guardrail_results=[], + tool_output_guardrail_results=[], + _current_agent_output_schema=output_schema, + trace=new_trace, + context_wrapper=context_wrapper, + ) + + # Start the streaming loop in background + streamed_result._run_impl_task = asyncio.create_task( + cls._streaming_loop( + starting_input=input, + streamed_result=streamed_result, + starting_agent=starting_agent, + max_turns=max_turns, + hooks=hooks, + context_wrapper=context_wrapper, + run_config=run_config, + session=session, + ) + ) + return streamed_result + + @classmethod + async def _streaming_loop( + cls, + starting_input: str | list[TResponseInputItem], + streamed_result: RunResultStreaming, + starting_agent: Agent[TContext], + max_turns: int, + hooks: RunHooks[TContext], + context_wrapper: RunContextWrapper[TContext], + run_config: RunConfig, + session: Session | None = None, + ) -> None: + """Core ReAct loop: think → act → observe → repeat.""" + if streamed_result.trace: + streamed_result.trace.start(mark_as_current=True) + + current_span: Span[AgentSpanData] | None = None + current_agent = starting_agent + current_turn = 0 + should_run_agent_start_hooks = True + tool_use_tracker = AgentToolUseTracker() + + # Emit initial agent event + streamed_result._event_queue.put_nowait(AgentUpdatedStreamEvent(new_agent=current_agent)) + + try: + # Prepare input with session if enabled + prepared_input = await cls._prepare_input_with_session( + starting_input, session, run_config.session_input_callback + ) + streamed_result.input = prepared_input + + # Save original input to session (empty new_items at start) + await cls._save_result_to_session(session, starting_input, []) + + while True: + # Check for cancellation + if streamed_result._cancel_mode == "after_turn": + streamed_result.is_complete = True + streamed_result._event_queue.put_nowait(QueueCompleteSentinel()) + break + + if streamed_result.is_complete: + break + + # Get all tools for the current agent + all_tools = await cls._get_all_tools(current_agent, context_wrapper) + # await RunImpl.initialize_computer_tools(tools=all_tools, context_wrapper=context_wrapper) + + # Start agent span if not already started + if current_span is None: + handoff_names = [h.agent_name for h in await cls._get_handoffs(current_agent, context_wrapper)] + output_type_name = cls._get_output_schema(current_agent) + output_type_name = output_type_name.name() if output_type_name else "str" + + current_span = agent_span( + name=current_agent.name, + handoffs=handoff_names, + output_type=output_type_name, + ) + current_span.start(mark_as_current=True) + current_span.span_data.tools = [t.name for t in all_tools] + + current_turn += 1 + streamed_result.current_turn = current_turn + + # Check max turns - if exceeded, attach error to span and break + # The MaxTurnsExceeded exception is raised by RunResultStreaming._check_errors() + # when consumer calls stream_events() + if current_turn > max_turns: + _error_tracing.attach_error_to_span( + current_span, + SpanError( + message="Max turns exceeded", + data={"max_turns": max_turns}, + ), + ) + streamed_result._event_queue.put_nowait(QueueCompleteSentinel()) + break + + # Run single turn + turn_result = await cls._run_single_turn_streamed( + streamed_result=streamed_result, + agent=current_agent, + hooks=hooks, + context_wrapper=context_wrapper, + run_config=run_config, + should_run_agent_start_hooks=should_run_agent_start_hooks, + tool_use_tracker=tool_use_tracker, + all_tools=all_tools, + ) + should_run_agent_start_hooks = False + + # Update result state + streamed_result.raw_responses = streamed_result.raw_responses + [turn_result.model_response] + streamed_result.input = turn_result.original_input + streamed_result.new_items = turn_result.generated_items + + # Handle next step + if isinstance(turn_result.next_step, NextStepHandoff): + # Save to session before handoff + await cls._save_result_to_session(session, [], turn_result.new_step_items) + + current_agent = cast(Agent[TContext], turn_result.next_step.new_agent) + current_span.finish(reset_current=True) + current_span = None + should_run_agent_start_hooks = True + streamed_result._event_queue.put_nowait(AgentUpdatedStreamEvent(new_agent=current_agent)) + + if streamed_result._cancel_mode == "after_turn": + streamed_result.is_complete = True + streamed_result._event_queue.put_nowait(QueueCompleteSentinel()) + break + + elif isinstance(turn_result.next_step, NextStepFinalOutput): + # Save to session before completing + await cls._save_result_to_session(session, [], turn_result.new_step_items) + + streamed_result.final_output = turn_result.next_step.output + streamed_result.is_complete = True + streamed_result._event_queue.put_nowait(QueueCompleteSentinel()) + + elif isinstance(turn_result.next_step, NextStepRunAgain): + # Save to session before next turn + await cls._save_result_to_session(session, [], turn_result.new_step_items) + + if streamed_result._cancel_mode == "after_turn": + streamed_result.is_complete = True + streamed_result._event_queue.put_nowait(QueueCompleteSentinel()) + break + + streamed_result.is_complete = True + except Exception: + streamed_result.is_complete = True + streamed_result._event_queue.put_nowait(QueueCompleteSentinel()) + raise + finally: + if current_span: + current_span.finish(reset_current=True) + if streamed_result.trace: + streamed_result.trace.finish(reset_current=True) + if not streamed_result.is_complete: + streamed_result.is_complete = True + streamed_result._event_queue.put_nowait(QueueCompleteSentinel()) + + @classmethod + async def _run_single_turn_streamed( + cls, + streamed_result: RunResultStreaming, + agent: Agent[TContext], + hooks: RunHooks[TContext], + context_wrapper: RunContextWrapper[TContext], + run_config: RunConfig, + should_run_agent_start_hooks: bool, + tool_use_tracker: AgentToolUseTracker, + all_tools: list[Tool], + ) -> SingleStepResult: + """Execute a single turn of the ReAct loop with streaming.""" + # emitted_tool_call_ids: set[str] = set() + # emitted_reasoning_item_ids: set[str] = set() + + # Run agent start hooks + if should_run_agent_start_hooks: + await asyncio.gather( + hooks.on_agent_start(context_wrapper, agent), + agent.hooks.on_start(context_wrapper, agent) if agent.hooks else _coro.noop_coroutine(), + ) + + output_schema = cls._get_output_schema(agent) + streamed_result.current_agent = agent + streamed_result._current_agent_output_schema = output_schema + + # Get prompts and handoffs + system_prompt, prompt_config = await asyncio.gather( + agent.get_system_prompt(context_wrapper), + agent.get_prompt(context_wrapper), + ) + handoffs = await cls._get_handoffs(agent, context_wrapper) + model = cls._get_model(agent, run_config) + model_settings = agent.model_settings.resolve(run_config.model_settings) + model_settings = RunImpl.maybe_reset_tool_choice(agent, tool_use_tracker, model_settings) + + # Prepare input + input_items = ItemHelpers.input_to_new_input_list(streamed_result.input) + input_items.extend([item.to_input_item() for item in streamed_result.new_items]) + + # Optional input filter + filtered = await cls._maybe_filter_model_input( + agent=agent, + run_config=run_config, + context_wrapper=context_wrapper, + input_items=input_items, + system_instructions=system_prompt, + ) + + # Call on_llm_start hooks + await asyncio.gather( + hooks.on_llm_start(context_wrapper, agent, filtered.instructions, filtered.input), + agent.hooks.on_llm_start(context_wrapper, agent, filtered.instructions, filtered.input) + if agent.hooks + else _coro.noop_coroutine(), + ) + + final_response: ModelResponse | None = None + + # Stream model response + async for event in model.stream_response( + filtered.instructions, + filtered.input, + model_settings, + all_tools, + output_schema, + handoffs, + get_model_tracing_impl(run_config.tracing_disabled, run_config.trace_include_sensitive_data), + prompt=prompt_config, + ): + # Emit raw event + streamed_result._event_queue.put_nowait(RawResponsesStreamEvent(data=event)) + + if isinstance(event, ResponseCompletedEvent): + usage = ( + Usage( + requests=1, + input_tokens=event.response.usage.input_tokens, + output_tokens=event.response.usage.output_tokens, + total_tokens=event.response.usage.total_tokens, + input_tokens_details=event.response.usage.input_tokens_details, + output_tokens_details=event.response.usage.output_tokens_details, + ) + if event.response.usage + else Usage() + ) + final_response = ModelResponse( + output=event.response.output, + usage=usage, + response_id=event.response.id, + ) + context_wrapper.usage.add(usage) + + # Call on_llm_end hooks + if final_response is not None: + await asyncio.gather( + agent.hooks.on_llm_end(context_wrapper, agent, final_response) + if agent.hooks + else _coro.noop_coroutine(), + hooks.on_llm_end(context_wrapper, agent, final_response), + ) + + if not final_response: + raise ModelBehaviorError("Model did not produce a final response!") + + # Process response and execute tools + single_step_result = await cls._get_single_step_result_from_response( + agent=agent, + original_input=streamed_result.input, + pre_step_items=streamed_result.new_items, + new_response=final_response, + output_schema=output_schema, + all_tools=all_tools, + handoffs=handoffs, + hooks=hooks, + context_wrapper=context_wrapper, + run_config=run_config, + tool_use_tracker=tool_use_tracker, + event_queue=streamed_result._event_queue, + ) + + # Stream step results to queue + RunImpl.stream_step_result_to_queue(single_step_result, streamed_result._event_queue) + return single_step_result + + @classmethod + async def _get_single_step_result_from_response( + cls, + agent: Agent[TContext], + original_input: str | list[TResponseInputItem], + pre_step_items: list[RunItem], + new_response: ModelResponse, + output_schema: AgentOutputSchemaBase | None, + all_tools: list[Tool], + handoffs: list[Handoff], + hooks: RunHooks[TContext], + context_wrapper: RunContextWrapper[TContext], + run_config: RunConfig, + tool_use_tracker: AgentToolUseTracker, + event_queue: asyncio.Queue | None = None, + ) -> SingleStepResult: + """Process model response and execute tools.""" + processed_response = RunImpl.process_model_response( + agent=agent, + all_tools=all_tools, + response=new_response, + output_schema=output_schema, + handoffs=handoffs, + ) + tool_use_tracker.add_tool_use(agent, processed_response.tools_used) + + # Stream handoff items immediately + if event_queue is not None and processed_response.new_items: + handoff_items = [item for item in processed_response.new_items if isinstance(item, HandoffCallItem)] + if handoff_items: + RunImpl.stream_step_items_to_queue(cast(list[RunItem], handoff_items), event_queue) + + return await RunImpl.execute_tools_and_side_effects( + agent=agent, + original_input=original_input, + pre_step_items=pre_step_items, + new_response=new_response, + processed_response=processed_response, + output_schema=output_schema, + hooks=hooks, + context_wrapper=context_wrapper, + run_config=run_config, + ) + + # Helper methods + @staticmethod + def _copy_input(input: str | list[TResponseInputItem]) -> str | list[TResponseInputItem]: + if isinstance(input, str): + return input + return list(input) + + @staticmethod + def _get_output_schema(agent: Agent) -> AgentOutputSchemaBase | None: + if agent.output_type is None or agent.output_type is str: + return None + return AgentOutputSchema(agent.output_type) + + @staticmethod + def _get_model(agent: Agent, run_config: RunConfig) -> Model: + if isinstance(run_config.model, Model): + return run_config.model + elif isinstance(run_config.model, str): + return run_config.model_provider.get_model(run_config.model) + elif isinstance(agent.model, Model): + return agent.model + + return run_config.model_provider.get_model(agent.model) + + @staticmethod + async def _get_all_tools(agent: Agent[TContext], context_wrapper: RunContextWrapper[TContext]) -> list[Tool]: + return await agent.get_all_tools(context_wrapper) + + @staticmethod + async def _get_handoffs(agent: Agent[TContext], context_wrapper: RunContextWrapper[TContext]) -> list[Handoff]: + resolved = [] + for h in agent.handoffs: + if isinstance(h, Handoff): + resolved.append(h) + elif isinstance(h, Agent): + resolved.append(handoff(h)) + elif callable(h): + result = h(context_wrapper) + if inspect.isawaitable(result): + result = await result + if isinstance(result, Handoff): + resolved.append(result) + elif isinstance(result, Agent): + resolved.append(handoff(result)) + return resolved + + @staticmethod + async def _maybe_filter_model_input( + agent: Agent[TContext], + run_config: RunConfig, + context_wrapper: RunContextWrapper[TContext], + input_items: list[TResponseInputItem], + system_instructions: str | None, + ): + """Apply optional input filter.""" + effective_instructions = system_instructions + effective_input = input_items + + if run_config.call_model_input_filter is None: + return ModelInputData(input=effective_input, instructions=effective_instructions) + + model_input = ModelInputData(input=effective_input.copy(), instructions=effective_instructions) + filter_payload = CallModelData(model_data=model_input, agent=agent, context=context_wrapper.context) + maybe_updated = run_config.call_model_input_filter(filter_payload) + updated = await maybe_updated if inspect.isawaitable(maybe_updated) else maybe_updated + return updated + + @classmethod + async def _prepare_input_with_session( + cls, + input: str | list[TResponseInputItem], + session: Session | None, + session_input_callback=None, + ) -> str | list[TResponseInputItem]: + """Prepare input by combining it with session history if enabled.""" + if session is None: + return input + + # If the user doesn't specify an input callback and pass a list as input + if isinstance(input, list) and not session_input_callback: + raise UserError( + "When using session memory, list inputs require a " + "`RunConfig.session_input_callback` to define how they should be merged " + "with the conversation history. If you don't want to use a callback, " + "provide your input as a string instead, or disable session memory " + "(session=None) and pass a list to manage the history manually." + ) + + # Get previous conversation history + history = await session.get_items() + + # Convert input to list format + new_input_list = ItemHelpers.input_to_new_input_list(input) + + if session_input_callback is None: + return history + new_input_list + elif callable(session_input_callback): + res = session_input_callback(history, new_input_list) + if inspect.isawaitable(res): + return await res + return res + else: + raise UserError( + f"Invalid `session_input_callback` value: {session_input_callback}. " + "Choose between `None` or a custom callable function." + ) + + @classmethod + async def _save_result_to_session( + cls, + session: Session | None, + original_input: str | list[TResponseInputItem], + new_items: list[RunItem], + ) -> None: + """Save the conversation turn to session.""" + if session is None: + return + + # Convert original input to list format if needed + input_list = ItemHelpers.input_to_new_input_list(original_input) + + # Convert new items to input format + new_items_as_input = [item.to_input_item() for item in new_items] + + # Save all items from this turn + items_to_save = input_list + new_items_as_input + await session.add_items(items_to_save)