diff --git a/lib/cli/src/crewai_cli/kickoff_flow.py b/lib/cli/src/crewai_cli/kickoff_flow.py index bde1ddee73..71b6079eff 100644 --- a/lib/cli/src/crewai_cli/kickoff_flow.py +++ b/lib/cli/src/crewai_cli/kickoff_flow.py @@ -65,8 +65,12 @@ def _load_conversational_flow_from_kickoff_script() -> Any | None: def _run_conversational_flow_tui(flow: Any) -> Any: + from crewai.events.event_listener import EventListener + from crewai_cli.crew_run_tui import CrewRunApp + EventListener() # ensures we get events from the TUI + app = CrewRunApp( crew_name=getattr(flow, "name", None) or type(flow).__name__, conversational=True, diff --git a/lib/cli/tests/test_kickoff_flow.py b/lib/cli/tests/test_kickoff_flow.py index 52eb299ee8..b22201404f 100644 --- a/lib/cli/tests/test_kickoff_flow.py +++ b/lib/cli/tests/test_kickoff_flow.py @@ -61,3 +61,40 @@ class Result: kickoff_flow.kickoff_flow() assert calls == [["uv", "run", "kickoff"]] + + +def test_run_conversational_flow_tui_initializes_event_listener(monkeypatch) -> None: + calls: list[str] = [] + + class FakeEventListener: + def __init__(self) -> None: + calls.append("listener") + + class FakeCrewRunApp: + def __init__(self, *, crew_name: str, conversational: bool) -> None: + calls.append("app") + self.crew_name = crew_name + self.conversational = conversational + self._status = "completed" + self._crew_result = "done" + self._flow = None + + def run(self) -> None: + calls.append("run") + + class DemoFlow: + name = "Demo" + + monkeypatch.setattr( + "crewai.events.event_listener.EventListener", + FakeEventListener, + ) + monkeypatch.setattr( + "crewai_cli.crew_run_tui.CrewRunApp", + FakeCrewRunApp, + ) + + result = kickoff_flow._run_conversational_flow_tui(DemoFlow()) + + assert result == "done" + assert calls == ["listener", "app", "run"] diff --git a/lib/crewai/src/crewai/events/__init__.py b/lib/crewai/src/crewai/events/__init__.py index ce4a01a228..ceb6271e99 100644 --- a/lib/crewai/src/crewai/events/__init__.py +++ b/lib/crewai/src/crewai/events/__init__.py @@ -63,6 +63,9 @@ from crewai.events.types.flow_events import ( ConversationMessageAddedEvent, ConversationRouteSelectedEvent, + ConversationTurnCompletedEvent, + ConversationTurnFailedEvent, + ConversationTurnStartedEvent, FlowCreatedEvent, FlowEvent, FlowFinishedEvent, @@ -185,6 +188,9 @@ "CrewTrainStartedEvent": "crewai.events.types.crew_events", "ConversationMessageAddedEvent": "crewai.events.types.flow_events", "ConversationRouteSelectedEvent": "crewai.events.types.flow_events", + "ConversationTurnCompletedEvent": "crewai.events.types.flow_events", + "ConversationTurnFailedEvent": "crewai.events.types.flow_events", + "ConversationTurnStartedEvent": "crewai.events.types.flow_events", "FlowCreatedEvent": "crewai.events.types.flow_events", "FlowEvent": "crewai.events.types.flow_events", "FlowFinishedEvent": "crewai.events.types.flow_events", @@ -305,6 +311,9 @@ def __getattr__(name: str) -> Any: "CircularDependencyError", "ConversationMessageAddedEvent", "ConversationRouteSelectedEvent", + "ConversationTurnCompletedEvent", + "ConversationTurnFailedEvent", + "ConversationTurnStartedEvent", "CrewKickoffCompletedEvent", "CrewKickoffFailedEvent", "CrewKickoffStartedEvent", diff --git a/lib/crewai/src/crewai/events/event_listener.py b/lib/crewai/src/crewai/events/event_listener.py index 883147478d..a1a771f44a 100644 --- a/lib/crewai/src/crewai/events/event_listener.py +++ b/lib/crewai/src/crewai/events/event_listener.py @@ -41,6 +41,7 @@ DefaultEnvEvent, ) from crewai.events.types.flow_events import ( + ConversationTurnCompletedEvent, FlowCreatedEvent, FlowFinishedEvent, FlowPausedEvent, @@ -317,6 +318,12 @@ def on_flow_finished(source: Any, event: FlowFinishedEvent) -> None: source.flow_id, ) + @crewai_event_bus.on(ConversationTurnCompletedEvent) + def on_conversation_turn_completed( + _: Any, event: ConversationTurnCompletedEvent + ) -> None: + self._telemetry.feature_usage_span("flow:conversation_turn") + @crewai_event_bus.on(MethodExecutionStartedEvent) def on_method_execution_started( source: Any, event: MethodExecutionStartedEvent diff --git a/lib/crewai/src/crewai/events/event_types.py b/lib/crewai/src/crewai/events/event_types.py index f78278d506..8c589849ac 100644 --- a/lib/crewai/src/crewai/events/event_types.py +++ b/lib/crewai/src/crewai/events/event_types.py @@ -55,6 +55,9 @@ from crewai.events.types.flow_events import ( ConversationMessageAddedEvent, ConversationRouteSelectedEvent, + ConversationTurnCompletedEvent, + ConversationTurnFailedEvent, + ConversationTurnStartedEvent, FlowFinishedEvent, FlowStartedEvent, MethodExecutionFailedEvent, @@ -162,6 +165,9 @@ | TaskFailedEvent | ConversationMessageAddedEvent | ConversationRouteSelectedEvent + | ConversationTurnCompletedEvent + | ConversationTurnFailedEvent + | ConversationTurnStartedEvent | FlowStartedEvent | FlowFinishedEvent | MethodExecutionStartedEvent diff --git a/lib/crewai/src/crewai/events/types/flow_events.py b/lib/crewai/src/crewai/events/types/flow_events.py index 2af20c9794..8e33b384c6 100644 --- a/lib/crewai/src/crewai/events/types/flow_events.py +++ b/lib/crewai/src/crewai/events/types/flow_events.py @@ -184,6 +184,34 @@ class ConversationMessageAddedEvent(FlowEvent): type: Literal["conversation_message_added"] = "conversation_message_added" +class ConversationTurnStartedEvent(FlowEvent): + """Event emitted when a conversational Flow starts a user turn.""" + + session_id: str + type: Literal["conversation_turn_started"] = "conversation_turn_started" + + +class ConversationTurnCompletedEvent(FlowEvent): + """Event emitted when a conversational Flow completes a user turn.""" + + session_id: str + type: Literal["conversation_turn_completed"] = "conversation_turn_completed" + + +class ConversationTurnFailedEvent(FlowEvent): + """Event emitted when a conversational Flow turn fails.""" + + session_id: str + error: Exception + type: Literal["conversation_turn_failed"] = "conversation_turn_failed" + + model_config = ConfigDict(arbitrary_types_allowed=True) + + @field_serializer("error") + def _serialize_error(self, error: Exception) -> str: + return str(error) + + class ConversationRouteSelectedEvent(FlowEvent): """Event emitted when a conversational Flow selects a route for a turn.""" diff --git a/lib/crewai/src/crewai/experimental/conversational_mixin.py b/lib/crewai/src/crewai/experimental/conversational_mixin.py index 4f39565c0a..95826d9ee9 100644 --- a/lib/crewai/src/crewai/experimental/conversational_mixin.py +++ b/lib/crewai/src/crewai/experimental/conversational_mixin.py @@ -30,6 +30,9 @@ from crewai.events.types.flow_events import ( ConversationMessageAddedEvent, ConversationRouteSelectedEvent, + ConversationTurnCompletedEvent, + ConversationTurnFailedEvent, + ConversationTurnStartedEvent, ) from crewai.experimental.conversational import ( AgentMessage, @@ -280,6 +283,14 @@ def handle_turn( """ state = cast(ConversationState, self.state) sid = session_id or state.id + crewai_event_bus.emit( + self, + ConversationTurnStartedEvent( + type="conversation_turn_started", + flow_name=self.name or self.__class__.__name__, + session_id=sid, + ), + ) # Stash the pending turn so the kickoff extension hook picks it up # after persist restore. @@ -287,27 +298,62 @@ def handle_turn( self._pending_intents = list(intents) if intents else None self._pending_intent_llm = intent_llm - # Each turn is a fresh execution; clear graph tracking so the second - # turn re-runs instead of being treated as a checkpoint restore. - if "from_checkpoint" not in kickoff_kwargs: - self._reset_turn_execution_state() - - assistant_count = self._assistant_message_count() + failed_event: ConversationTurnFailedEvent | None = None try: + # Each turn is a fresh execution; clear graph tracking so the second + # turn re-runs instead of being treated as a checkpoint restore. + if "from_checkpoint" not in kickoff_kwargs: + self._reset_turn_execution_state() + + assistant_count = self._assistant_message_count() result = self.kickoff(inputs={"id": sid}, **kickoff_kwargs) + if ( + result is not None + and self._assistant_message_count() == assistant_count + and self._is_public_turn_result(result) + ): + self.append_assistant_message(self._stringify_result(result)) + except Exception as exc: + failed_event = ConversationTurnFailedEvent( + type="conversation_turn_failed", + flow_name=self.name or self.__class__.__name__, + session_id=sid, + error=exc, + ) + raise finally: self._pending_user_message = None self._pending_intents = None self._pending_intent_llm = None + if failed_event is not None: + self._emit_terminal_conversation_turn_event(failed_event) - if ( - result is not None - and self._assistant_message_count() == assistant_count - and self._is_public_turn_result(result) - ): - self.append_assistant_message(self._stringify_result(result)) + self._emit_terminal_conversation_turn_event( + ConversationTurnCompletedEvent( + type="conversation_turn_completed", + flow_name=self.name or self.__class__.__name__, + session_id=sid, + ), + ) return result + def _emit_terminal_conversation_turn_event( + self, + event: ConversationTurnCompletedEvent | ConversationTurnFailedEvent, + ) -> None: + """Emit a terminal turn event and wait for its own handlers.""" + future = crewai_event_bus.emit(self, event) + if future is None: + return + try: + future.result(timeout=30) + except Exception: + logger.warning( + "%s handler failed or timed out", + event.__class__.__name__, + exc_info=True, + ) + def chat( self, *, diff --git a/lib/crewai/tests/test_flow_conversation.py b/lib/crewai/tests/test_flow_conversation.py index d8cc0bd374..dd20cc61d0 100644 --- a/lib/crewai/tests/test_flow_conversation.py +++ b/lib/crewai/tests/test_flow_conversation.py @@ -14,6 +14,9 @@ from crewai.events.types.flow_events import ( ConversationMessageAddedEvent, ConversationRouteSelectedEvent, + ConversationTurnCompletedEvent, + ConversationTurnFailedEvent, + ConversationTurnStartedEvent, FlowStartedEvent, MethodExecutionFinishedEvent, MethodExecutionStartedEvent, @@ -1123,6 +1126,140 @@ def capture_message( assert observed_events[0] == "flow_started" assert observed_events[1] == "conversation_message_added" + def test_handle_turn_emits_started_and_completed_for_each_conversational_turn( + self, + ) -> None: + """Each ``handle_turn()`` emits paired turn lifecycle events.""" + + @ConversationConfig(defer_trace_finalization=True) + class DeferredFlow(ConversationalFlow): + def route_turn(self, context: dict[str, Any]) -> str | None: + return "work" + + @listen("work") + def do_work(self) -> str: + self.append_assistant_message("worked") + return "worked" + + flow = DeferredFlow() + default_session_id = flow.state.id + turn_events: list[ + ConversationTurnStartedEvent | ConversationTurnCompletedEvent + ] = [] + + original_emit = crewai_event_bus.emit + + def capture_emit(source: Any, event: Any) -> Any: + if isinstance( + event, (ConversationTurnStartedEvent, ConversationTurnCompletedEvent) + ): + turn_events.append(event) + return original_emit(source, event) + + with patch.object(crewai_event_bus, "emit", side_effect=capture_emit): + flow.handle_turn("turn 1") + flow.handle_turn("turn 2", session_id="custom-session") + crewai_event_bus.flush() + + assert [event.type for event in turn_events] == [ + "conversation_turn_started", + "conversation_turn_completed", + "conversation_turn_started", + "conversation_turn_completed", + ] + assert turn_events[0].session_id == default_session_id + assert turn_events[1].session_id == default_session_id + assert turn_events[2].session_id == "custom-session" + assert turn_events[3].session_id == "custom-session" + + def test_handle_turn_emits_failed_instead_of_completed_when_turn_raises( + self, + ) -> None: + """Failed turns emit a terminal failure event without completion.""" + + @ConversationConfig(defer_trace_finalization=True) + class FailingFlow(ConversationalFlow): + def route_turn(self, context: dict[str, Any]) -> str | None: + return "work" + + @listen("work") + def do_work(self) -> str: + raise RuntimeError("turn exploded") + + flow = FailingFlow() + turn_events: list[ + ConversationTurnStartedEvent + | ConversationTurnCompletedEvent + | ConversationTurnFailedEvent + ] = [] + handled_failed_events: list[ConversationTurnFailedEvent] = [] + original_emit = crewai_event_bus.emit + + def capture_emit(source: Any, event: Any) -> Any: + if isinstance( + event, + ( + ConversationTurnStartedEvent, + ConversationTurnCompletedEvent, + ConversationTurnFailedEvent, + ), + ): + turn_events.append(event) + return original_emit(source, event) + + with ( + crewai_event_bus.scoped_handlers(), + patch.object(crewai_event_bus, "emit", side_effect=capture_emit), + ): + + @crewai_event_bus.on(ConversationTurnFailedEvent) + def capture_failed( + _: Any, event: ConversationTurnFailedEvent + ) -> None: + handled_failed_events.append(event) + + with pytest.raises(RuntimeError, match="turn exploded"): + flow.handle_turn("turn 1") + + assert [event.type for event in turn_events] == [ + "conversation_turn_started", + "conversation_turn_failed", + ] + assert turn_events[0].session_id == flow.state.id + failed_event = turn_events[1] + assert isinstance(failed_event, ConversationTurnFailedEvent) + assert failed_event.session_id == flow.state.id + assert str(failed_event.error) == "turn exploded" + assert handled_failed_events == [failed_event] + + def test_conversation_turn_completed_tracks_feature_usage(self) -> None: + """Completed conversation turns count conversational Flow usage.""" + from crewai.events.event_listener import event_listener + + @ConversationConfig(defer_trace_finalization=True) + class DeferredFlow(ConversationalFlow): + def route_turn(self, context: dict[str, Any]) -> str | None: + return "work" + + @listen("work") + def do_work(self) -> str: + self.append_assistant_message("worked") + return "worked" + + flow = DeferredFlow() + + with ( + crewai_event_bus.scoped_handlers(), + patch.object( + event_listener._telemetry, + "feature_usage_span", + ) as feature_usage_span, + ): + event_listener.setup_listeners(crewai_event_bus) + flow.handle_turn("turn 1") + + feature_usage_span.assert_any_call("flow:conversation_turn") + def test_route_event_uses_no_message_index_for_empty_transcript(self) -> None: """Route events do not reference index zero when no message exists."""