From e73de8499c72866f1073619a247faa6a0922d404 Mon Sep 17 00:00:00 2001 From: C1-BA-B1-F3 Date: Fri, 26 Jun 2026 22:58:55 +0800 Subject: [PATCH 1/3] fix: add missing ExecutorContext methods to AgentExecutor for human_input support AgentExecutor was missing three methods required by the ExecutorContext protocol: _invoke_loop, _ainvoke_loop, and _format_feedback_message. This caused Task(human_input=True) to crash with AttributeError after the default executor was swapped from CrewAgentExecutor to AgentExecutor in 1.15.0. Fixes #6347 Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus --- .../src/crewai/experimental/agent_executor.py | 51 ++++++++++++++ .../tests/agents/test_agent_executor.py | 66 +++++++++++++++++++ 2 files changed, 117 insertions(+) diff --git a/lib/crewai/src/crewai/experimental/agent_executor.py b/lib/crewai/src/crewai/experimental/agent_executor.py index 0ecd8e63a4..2323bf35fc 100644 --- a/lib/crewai/src/crewai/experimental/agent_executor.py +++ b/lib/crewai/src/crewai/experimental/agent_executor.py @@ -3160,6 +3160,57 @@ def _format_prompt(prompt: str, inputs: dict[str, str]) -> str: prompt = prompt.replace("{tool_names}", inputs["tool_names"]) return prompt.replace("{tools}", inputs["tools"]) + def _invoke_loop(self) -> AgentFinish: + """Re-run the agent flow loop for a feedback iteration (sync). + + Resets execution state and re-invokes the Flow-based execution so that + the agent can process human feedback and produce a new answer. + + Returns: + New AgentFinish result after processing feedback. + """ + self._finalize_called = False + self.state.is_finished = False + self.kickoff() + result = self.state.current_answer + if not isinstance(result, AgentFinish): + raise RuntimeError( + "Agent execution ended without reaching a final answer." + ) + return result + + async def _ainvoke_loop(self) -> AgentFinish: + """Re-run the agent flow loop for a feedback iteration (async). + + Resets execution state and re-invokes the Flow-based execution so that + the agent can process human feedback and produce a new answer. + + Returns: + New AgentFinish result after processing feedback. + """ + self._finalize_called = False + self.state.is_finished = False + await self.kickoff_async() + result = self.state.current_answer + if not isinstance(result, AgentFinish): + raise RuntimeError( + "Agent execution ended without reaching a final answer." + ) + return result + + def _format_feedback_message(self, feedback: str) -> LLMMessage: + """Format human feedback as a message for the LLM. + + Args: + feedback: User feedback string. + + Returns: + Formatted message dict. + """ + return format_message_for_llm( + I18N_DEFAULT.slice("feedback_instructions").format(feedback=feedback) + ) + def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish: """Process human feedback and refine answer. diff --git a/lib/crewai/tests/agents/test_agent_executor.py b/lib/crewai/tests/agents/test_agent_executor.py index e4de4a484b..c356937b97 100644 --- a/lib/crewai/tests/agents/test_agent_executor.py +++ b/lib/crewai/tests/agents/test_agent_executor.py @@ -2391,3 +2391,69 @@ def test_anthropic_provider_has_image_block_converter(self): assert hasattr(AnthropicCompletion, "_convert_image_blocks"), ( "Anthropic provider must have _convert_image_blocks for auto-conversion" ) + + +# Human Input Protocol Compliance + + +class TestAgentExecutorHumanInputProtocol: + """Verify AgentExecutor implements the ExecutorContext protocol methods + required for human_input=True to work. + + Regression test for https://github.com/crewAIInc/crewAI/issues/6347 + """ + + def test_has_invoke_loop(self): + """AgentExecutor must have _invoke_loop for sync human feedback.""" + assert hasattr(AgentExecutor, "_invoke_loop"), ( + "AgentExecutor missing _invoke_loop — human_input=True will crash" + ) + assert callable(getattr(AgentExecutor, "_invoke_loop", None)) + + def test_has_ainvoke_loop(self): + """AgentExecutor must have _ainvoke_loop for async human feedback.""" + assert hasattr(AgentExecutor, "_ainvoke_loop"), ( + "AgentExecutor missing _ainvoke_loop — async human_input=True will crash" + ) + assert callable(getattr(AgentExecutor, "_ainvoke_loop", None)) + + def test_has_format_feedback_message(self): + """AgentExecutor must have _format_feedback_message for formatting feedback.""" + assert hasattr(AgentExecutor, "_format_feedback_message"), ( + "AgentExecutor missing _format_feedback_message — human_input=True will crash" + ) + assert callable(getattr(AgentExecutor, "_format_feedback_message", None)) + + def test_format_feedback_message_returns_dict(self): + """_format_feedback_message should return a dict with role and content.""" + executor = _build_executor() + result = executor._format_feedback_message("test feedback") + assert isinstance(result, dict), "Feedback message should be a dict" + assert "role" in result, "Feedback message must have 'role' key" + assert "content" in result, "Feedback message must have 'content' key" + assert "test feedback" in result["content"], ( + "Feedback message content should include the feedback text" + ) + + def test_protocol_methods_match_executor_context(self): + """All methods from ExecutorContext protocol must exist on AgentExecutor.""" + from crewai.core.providers.human_input import ExecutorContext + + # Get all method names from the protocol + protocol_methods = [ + name for name in dir(ExecutorContext) + if not name.startswith("_") or name in ( + "_invoke_loop", + "_ainvoke_loop", + "_format_feedback_message", + "_is_training_mode", + "_handle_crew_training_output", + ) + ] + + for method_name in protocol_methods: + if method_name.startswith("__"): + continue + assert hasattr(AgentExecutor, method_name), ( + f"AgentExecutor missing protocol method: {method_name}" + ) From 0ff7f7f8423dd5b82fcf46035d5614de6f42469a Mon Sep 17 00:00:00 2001 From: Sisyphus Date: Sun, 28 Jun 2026 15:17:56 +0800 Subject: [PATCH 2/3] fix: reset agent executor state between runs --- .../src/crewai/experimental/agent_executor.py | 63 +++++++++++-------- .../tests/agents/test_agent_executor.py | 56 +++++++++++++++++ 2 files changed, 93 insertions(+), 26 deletions(-) diff --git a/lib/crewai/src/crewai/experimental/agent_executor.py b/lib/crewai/src/crewai/experimental/agent_executor.py index 2323bf35fc..907567b208 100644 --- a/lib/crewai/src/crewai/experimental/agent_executor.py +++ b/lib/crewai/src/crewai/experimental/agent_executor.py @@ -263,6 +263,13 @@ def _downgrade_to_text_tool_calling(self) -> None: ) ) + def _reset_pipeline_state(self) -> None: + """Clear transient pipeline state before and after each execution.""" + self.state.iterations = 0 + self.state.current_answer = None + self.state.use_native_tools = False + self.state.pending_tool_calls.clear() + def _is_tool_call_list(self, response: list[Any]) -> bool: """Check if a response is a list of tool calls.""" return is_tool_call_list(response) @@ -2741,11 +2748,8 @@ def invoke( # Reset state for fresh execution self._finalize_called = False self.state.messages.clear() - self.state.iterations = 0 - self.state.current_answer = None + self._reset_pipeline_state() self.state.is_finished = False - self.state.use_native_tools = False - self.state.pending_tool_calls = [] self.state.plan = None self.state.plan_ready = False self.state.todos = TodoList() @@ -2819,6 +2823,7 @@ def invoke( handle_unknown_error(PRINTER, e, verbose=self.agent.verbose) raise finally: + self._reset_pipeline_state() self._is_executing = False async def invoke_async(self, inputs: dict[str, Any]) -> dict[str, Any]: @@ -2847,11 +2852,8 @@ async def invoke_async(self, inputs: dict[str, Any]) -> dict[str, Any]: # Reset state for fresh execution self._finalize_called = False self.state.messages.clear() - self.state.iterations = 0 - self.state.current_answer = None + self._reset_pipeline_state() self.state.is_finished = False - self.state.use_native_tools = False - self.state.pending_tool_calls = [] self.state.plan = None self.state.plan_ready = False self.state.todos = TodoList() @@ -2927,6 +2929,7 @@ async def invoke_async(self, inputs: dict[str, Any]) -> dict[str, Any]: handle_unknown_error(PRINTER, e, verbose=self.agent.verbose) raise finally: + self._reset_pipeline_state() self._is_executing = False async def ainvoke(self, inputs: dict[str, Any]) -> dict[str, Any]: @@ -3169,15 +3172,19 @@ def _invoke_loop(self) -> AgentFinish: Returns: New AgentFinish result after processing feedback. """ - self._finalize_called = False - self.state.is_finished = False - self.kickoff() - result = self.state.current_answer - if not isinstance(result, AgentFinish): - raise RuntimeError( - "Agent execution ended without reaching a final answer." - ) - return result + self._reset_pipeline_state() + try: + self._finalize_called = False + self.state.is_finished = False + self.kickoff() + result = self.state.current_answer + if not isinstance(result, AgentFinish): + raise RuntimeError( + "Agent execution ended without reaching a final answer." + ) + return result + finally: + self._reset_pipeline_state() async def _ainvoke_loop(self) -> AgentFinish: """Re-run the agent flow loop for a feedback iteration (async). @@ -3188,15 +3195,19 @@ async def _ainvoke_loop(self) -> AgentFinish: Returns: New AgentFinish result after processing feedback. """ - self._finalize_called = False - self.state.is_finished = False - await self.kickoff_async() - result = self.state.current_answer - if not isinstance(result, AgentFinish): - raise RuntimeError( - "Agent execution ended without reaching a final answer." - ) - return result + self._reset_pipeline_state() + try: + self._finalize_called = False + self.state.is_finished = False + await self.kickoff_async() + result = self.state.current_answer + if not isinstance(result, AgentFinish): + raise RuntimeError( + "Agent execution ended without reaching a final answer." + ) + return result + finally: + self._reset_pipeline_state() def _format_feedback_message(self, feedback: str) -> LLMMessage: """Format human feedback as a message for the LLM. diff --git a/lib/crewai/tests/agents/test_agent_executor.py b/lib/crewai/tests/agents/test_agent_executor.py index c356937b97..306be730f5 100644 --- a/lib/crewai/tests/agents/test_agent_executor.py +++ b/lib/crewai/tests/agents/test_agent_executor.py @@ -958,6 +958,62 @@ def mock_kickoff_side_effect(): mock_kickoff.assert_called_once() mock_save_to_memory.assert_called_once() + def test_invoke_loop_clears_transient_state_between_sequential_runs( + self, mock_dependencies + ): + """Feedback reruns must not inherit pipeline state from prior runs.""" + executor = _build_executor(**mock_dependencies) + start_states = [] + outputs = ["First result", "Second result"] + + def mock_kickoff_side_effect(): + start_states.append( + ( + executor.state.iterations, + executor.state.current_answer, + executor.state.use_native_tools, + list(executor.state.pending_tool_calls), + ) + ) + executor.state.iterations = 3 + executor.state.use_native_tools = True + executor.state.pending_tool_calls.append("tool_call") + executor.state.current_answer = AgentFinish( + thought="final thinking", + output=outputs.pop(0), + text="complete", + ) + + executor.state.iterations = 9 + executor.state.current_answer = AgentAction( + thought="stale", tool="search", tool_input="query", text="action" + ) + executor.state.use_native_tools = True + executor.state.pending_tool_calls.append("stale_tool_call") + + with patch.object( + AgentExecutor, "kickoff", side_effect=mock_kickoff_side_effect + ): + first_result = executor._invoke_loop() + + assert first_result.output == "First result" + assert executor.state.iterations == 0 + assert executor.state.current_answer is None + assert executor.state.use_native_tools is False + assert executor.state.pending_tool_calls == [] + + second_result = executor._invoke_loop() + + assert second_result.output == "Second result" + assert start_states == [ + (0, None, False, []), + (0, None, False, []), + ] + assert executor.state.iterations == 0 + assert executor.state.current_answer is None + assert executor.state.use_native_tools is False + assert executor.state.pending_tool_calls == [] + @patch.object(AgentExecutor, "kickoff") def test_invoke_failure_no_agent_finish(self, mock_kickoff, mock_dependencies): """Test invoke fails without AgentFinish.""" From a932da9814c6b1772db1e449a87c67ad4121f8ee Mon Sep 17 00:00:00 2001 From: Sisyphus Date: Mon, 29 Jun 2026 00:33:27 +0800 Subject: [PATCH 3/3] test: cover async agent executor state reset --- .../tests/agents/test_agent_executor.py | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/lib/crewai/tests/agents/test_agent_executor.py b/lib/crewai/tests/agents/test_agent_executor.py index 306be730f5..12fa57cda6 100644 --- a/lib/crewai/tests/agents/test_agent_executor.py +++ b/lib/crewai/tests/agents/test_agent_executor.py @@ -1014,6 +1014,63 @@ def mock_kickoff_side_effect(): assert executor.state.use_native_tools is False assert executor.state.pending_tool_calls == [] + @pytest.mark.asyncio + async def test_ainvoke_loop_clears_transient_state_between_sequential_runs( + self, mock_dependencies + ): + """Async feedback reruns must not inherit pipeline state from prior runs.""" + executor = _build_executor(**mock_dependencies) + start_states = [] + outputs = ["First result", "Second result"] + + async def mock_kickoff_side_effect(): + start_states.append( + ( + executor.state.iterations, + executor.state.current_answer, + executor.state.use_native_tools, + list(executor.state.pending_tool_calls), + ) + ) + executor.state.iterations = 3 + executor.state.use_native_tools = True + executor.state.pending_tool_calls.append("tool_call") + executor.state.current_answer = AgentFinish( + thought="final thinking", + output=outputs.pop(0), + text="complete", + ) + + executor.state.iterations = 9 + executor.state.current_answer = AgentAction( + thought="stale", tool="search", tool_input="query", text="action" + ) + executor.state.use_native_tools = True + executor.state.pending_tool_calls.append("stale_tool_call") + + with patch.object( + AgentExecutor, "kickoff_async", side_effect=mock_kickoff_side_effect + ): + first_result = await executor._ainvoke_loop() + + assert first_result.output == "First result" + assert executor.state.iterations == 0 + assert executor.state.current_answer is None + assert executor.state.use_native_tools is False + assert executor.state.pending_tool_calls == [] + + second_result = await executor._ainvoke_loop() + + assert second_result.output == "Second result" + assert start_states == [ + (0, None, False, []), + (0, None, False, []), + ] + assert executor.state.iterations == 0 + assert executor.state.current_answer is None + assert executor.state.use_native_tools is False + assert executor.state.pending_tool_calls == [] + @patch.object(AgentExecutor, "kickoff") def test_invoke_failure_no_agent_finish(self, mock_kickoff, mock_dependencies): """Test invoke fails without AgentFinish."""