Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 70 additions & 8 deletions lib/crewai/src/crewai/experimental/agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,13 @@ def _downgrade_to_text_tool_calling(self) -> None:
)
)

def _reset_pipeline_state(self) -> None:
"""Clear transient pipeline state before and after each execution."""
self.state.iterations = 0
self.state.current_answer = None
self.state.use_native_tools = False
self.state.pending_tool_calls.clear()

def _is_tool_call_list(self, response: list[Any]) -> bool:
"""Check if a response is a list of tool calls."""
return is_tool_call_list(response)
Expand Down Expand Up @@ -2741,11 +2748,8 @@ def invoke(
# Reset state for fresh execution
self._finalize_called = False
self.state.messages.clear()
self.state.iterations = 0
self.state.current_answer = None
self._reset_pipeline_state()
self.state.is_finished = False
self.state.use_native_tools = False
self.state.pending_tool_calls = []
self.state.plan = None
self.state.plan_ready = False
self.state.todos = TodoList()
Expand Down Expand Up @@ -2819,6 +2823,7 @@ def invoke(
handle_unknown_error(PRINTER, e, verbose=self.agent.verbose)
raise
finally:
self._reset_pipeline_state()
self._is_executing = False

async def invoke_async(self, inputs: dict[str, Any]) -> dict[str, Any]:
Expand Down Expand Up @@ -2847,11 +2852,8 @@ async def invoke_async(self, inputs: dict[str, Any]) -> dict[str, Any]:
# Reset state for fresh execution
self._finalize_called = False
self.state.messages.clear()
self.state.iterations = 0
self.state.current_answer = None
self._reset_pipeline_state()
self.state.is_finished = False
self.state.use_native_tools = False
self.state.pending_tool_calls = []
self.state.plan = None
self.state.plan_ready = False
self.state.todos = TodoList()
Expand Down Expand Up @@ -2927,6 +2929,7 @@ async def invoke_async(self, inputs: dict[str, Any]) -> dict[str, Any]:
handle_unknown_error(PRINTER, e, verbose=self.agent.verbose)
raise
finally:
self._reset_pipeline_state()
self._is_executing = False

async def ainvoke(self, inputs: dict[str, Any]) -> dict[str, Any]:
Expand Down Expand Up @@ -3160,6 +3163,65 @@ def _format_prompt(prompt: str, inputs: dict[str, str]) -> str:
prompt = prompt.replace("{tool_names}", inputs["tool_names"])
return prompt.replace("{tools}", inputs["tools"])

def _invoke_loop(self) -> AgentFinish:
"""Re-run the agent flow loop for a feedback iteration (sync).

Resets execution state and re-invokes the Flow-based execution so that
the agent can process human feedback and produce a new answer.

Returns:
New AgentFinish result after processing feedback.
"""
self._reset_pipeline_state()
try:
self._finalize_called = False
self.state.is_finished = False
self.kickoff()
result = self.state.current_answer
if not isinstance(result, AgentFinish):
raise RuntimeError(
"Agent execution ended without reaching a final answer."
)
return result
finally:
self._reset_pipeline_state()

async def _ainvoke_loop(self) -> AgentFinish:
"""Re-run the agent flow loop for a feedback iteration (async).

Resets execution state and re-invokes the Flow-based execution so that
the agent can process human feedback and produce a new answer.

Returns:
New AgentFinish result after processing feedback.
"""
self._reset_pipeline_state()
try:
self._finalize_called = False
self.state.is_finished = False
await self.kickoff_async()
result = self.state.current_answer
if not isinstance(result, AgentFinish):
raise RuntimeError(
"Agent execution ended without reaching a final answer."
)
return result
finally:
self._reset_pipeline_state()

def _format_feedback_message(self, feedback: str) -> LLMMessage:
"""Format human feedback as a message for the LLM.

Args:
feedback: User feedback string.

Returns:
Formatted message dict.
"""
return format_message_for_llm(
I18N_DEFAULT.slice("feedback_instructions").format(feedback=feedback)
)

def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish:
"""Process human feedback and refine answer.

Expand Down
122 changes: 122 additions & 0 deletions lib/crewai/tests/agents/test_agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,62 @@ def mock_kickoff_side_effect():
mock_kickoff.assert_called_once()
mock_save_to_memory.assert_called_once()

def test_invoke_loop_clears_transient_state_between_sequential_runs(
self, mock_dependencies
):
"""Feedback reruns must not inherit pipeline state from prior runs."""
executor = _build_executor(**mock_dependencies)
start_states = []
outputs = ["First result", "Second result"]

def mock_kickoff_side_effect():
start_states.append(
(
executor.state.iterations,
executor.state.current_answer,
executor.state.use_native_tools,
list(executor.state.pending_tool_calls),
)
)
executor.state.iterations = 3
executor.state.use_native_tools = True
executor.state.pending_tool_calls.append("tool_call")
executor.state.current_answer = AgentFinish(
thought="final thinking",
output=outputs.pop(0),
text="complete",
)

executor.state.iterations = 9
executor.state.current_answer = AgentAction(
thought="stale", tool="search", tool_input="query", text="action"
)
executor.state.use_native_tools = True
executor.state.pending_tool_calls.append("stale_tool_call")

with patch.object(
AgentExecutor, "kickoff", side_effect=mock_kickoff_side_effect
):
first_result = executor._invoke_loop()

assert first_result.output == "First result"
assert executor.state.iterations == 0
assert executor.state.current_answer is None
assert executor.state.use_native_tools is False
assert executor.state.pending_tool_calls == []

second_result = executor._invoke_loop()

assert second_result.output == "Second result"
assert start_states == [
(0, None, False, []),
(0, None, False, []),
]
assert executor.state.iterations == 0
assert executor.state.current_answer is None
assert executor.state.use_native_tools is False
assert executor.state.pending_tool_calls == []

@patch.object(AgentExecutor, "kickoff")
def test_invoke_failure_no_agent_finish(self, mock_kickoff, mock_dependencies):
"""Test invoke fails without AgentFinish."""
Expand Down Expand Up @@ -2391,3 +2447,69 @@ def test_anthropic_provider_has_image_block_converter(self):
assert hasattr(AnthropicCompletion, "_convert_image_blocks"), (
"Anthropic provider must have _convert_image_blocks for auto-conversion"
)


# Human Input Protocol Compliance


class TestAgentExecutorHumanInputProtocol:
"""Verify AgentExecutor implements the ExecutorContext protocol methods
required for human_input=True to work.

Regression test for https://github.com/crewAIInc/crewAI/issues/6347
"""

def test_has_invoke_loop(self):
"""AgentExecutor must have _invoke_loop for sync human feedback."""
assert hasattr(AgentExecutor, "_invoke_loop"), (
"AgentExecutor missing _invoke_loop — human_input=True will crash"
)
assert callable(getattr(AgentExecutor, "_invoke_loop", None))

def test_has_ainvoke_loop(self):
"""AgentExecutor must have _ainvoke_loop for async human feedback."""
assert hasattr(AgentExecutor, "_ainvoke_loop"), (
"AgentExecutor missing _ainvoke_loop — async human_input=True will crash"
)
assert callable(getattr(AgentExecutor, "_ainvoke_loop", None))

def test_has_format_feedback_message(self):
"""AgentExecutor must have _format_feedback_message for formatting feedback."""
assert hasattr(AgentExecutor, "_format_feedback_message"), (
"AgentExecutor missing _format_feedback_message — human_input=True will crash"
)
assert callable(getattr(AgentExecutor, "_format_feedback_message", None))

def test_format_feedback_message_returns_dict(self):
"""_format_feedback_message should return a dict with role and content."""
executor = _build_executor()
result = executor._format_feedback_message("test feedback")
assert isinstance(result, dict), "Feedback message should be a dict"
assert "role" in result, "Feedback message must have 'role' key"
assert "content" in result, "Feedback message must have 'content' key"
assert "test feedback" in result["content"], (
"Feedback message content should include the feedback text"
)

def test_protocol_methods_match_executor_context(self):
"""All methods from ExecutorContext protocol must exist on AgentExecutor."""
from crewai.core.providers.human_input import ExecutorContext

# Get all method names from the protocol
protocol_methods = [
name for name in dir(ExecutorContext)
if not name.startswith("_") or name in (
"_invoke_loop",
"_ainvoke_loop",
"_format_feedback_message",
"_is_training_mode",
"_handle_crew_training_output",
)
]

for method_name in protocol_methods:
if method_name.startswith("__"):
continue
assert hasattr(AgentExecutor, method_name), (
f"AgentExecutor missing protocol method: {method_name}"
)