From e183edd60c9ed764cf06b88e6c242e9942a4699c Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Wed, 24 Jun 2026 12:02:23 -0700 Subject: [PATCH 1/4] Track conversational flow turn usage in telemetry --- lib/crewai/src/crewai/events/__init__.py | 3 + .../src/crewai/events/event_listener.py | 7 +++ lib/crewai/src/crewai/events/event_types.py | 2 + .../src/crewai/events/types/flow_events.py | 7 +++ .../experimental/conversational_mixin.py | 9 +++ lib/crewai/tests/test_flow_conversation.py | 61 +++++++++++++++++++ 6 files changed, 89 insertions(+) diff --git a/lib/crewai/src/crewai/events/__init__.py b/lib/crewai/src/crewai/events/__init__.py index ce4a01a228..0b7b944ad9 100644 --- a/lib/crewai/src/crewai/events/__init__.py +++ b/lib/crewai/src/crewai/events/__init__.py @@ -63,6 +63,7 @@ from crewai.events.types.flow_events import ( ConversationMessageAddedEvent, ConversationRouteSelectedEvent, + ConversationTurnStartedEvent, FlowCreatedEvent, FlowEvent, FlowFinishedEvent, @@ -185,6 +186,7 @@ "CrewTrainStartedEvent": "crewai.events.types.crew_events", "ConversationMessageAddedEvent": "crewai.events.types.flow_events", "ConversationRouteSelectedEvent": "crewai.events.types.flow_events", + "ConversationTurnStartedEvent": "crewai.events.types.flow_events", "FlowCreatedEvent": "crewai.events.types.flow_events", "FlowEvent": "crewai.events.types.flow_events", "FlowFinishedEvent": "crewai.events.types.flow_events", @@ -305,6 +307,7 @@ def __getattr__(name: str) -> Any: "CircularDependencyError", "ConversationMessageAddedEvent", "ConversationRouteSelectedEvent", + "ConversationTurnStartedEvent", "CrewKickoffCompletedEvent", "CrewKickoffFailedEvent", "CrewKickoffStartedEvent", diff --git a/lib/crewai/src/crewai/events/event_listener.py b/lib/crewai/src/crewai/events/event_listener.py index 883147478d..6af504a9f9 100644 --- a/lib/crewai/src/crewai/events/event_listener.py +++ b/lib/crewai/src/crewai/events/event_listener.py @@ -41,6 +41,7 @@ DefaultEnvEvent, ) from crewai.events.types.flow_events import ( + ConversationTurnStartedEvent, FlowCreatedEvent, FlowFinishedEvent, FlowPausedEvent, @@ -317,6 +318,12 @@ def on_flow_finished(source: Any, event: FlowFinishedEvent) -> None: source.flow_id, ) + @crewai_event_bus.on(ConversationTurnStartedEvent) + def on_conversation_turn_started( + _: Any, event: ConversationTurnStartedEvent + ) -> None: + self._telemetry.feature_usage_span("flow:conversation") + @crewai_event_bus.on(MethodExecutionStartedEvent) def on_method_execution_started( source: Any, event: MethodExecutionStartedEvent diff --git a/lib/crewai/src/crewai/events/event_types.py b/lib/crewai/src/crewai/events/event_types.py index f78278d506..2b5d92d7e0 100644 --- a/lib/crewai/src/crewai/events/event_types.py +++ b/lib/crewai/src/crewai/events/event_types.py @@ -55,6 +55,7 @@ from crewai.events.types.flow_events import ( ConversationMessageAddedEvent, ConversationRouteSelectedEvent, + ConversationTurnStartedEvent, FlowFinishedEvent, FlowStartedEvent, MethodExecutionFailedEvent, @@ -162,6 +163,7 @@ | TaskFailedEvent | ConversationMessageAddedEvent | ConversationRouteSelectedEvent + | ConversationTurnStartedEvent | FlowStartedEvent | FlowFinishedEvent | MethodExecutionStartedEvent diff --git a/lib/crewai/src/crewai/events/types/flow_events.py b/lib/crewai/src/crewai/events/types/flow_events.py index 2af20c9794..0a151649ac 100644 --- a/lib/crewai/src/crewai/events/types/flow_events.py +++ b/lib/crewai/src/crewai/events/types/flow_events.py @@ -184,6 +184,13 @@ class ConversationMessageAddedEvent(FlowEvent): type: Literal["conversation_message_added"] = "conversation_message_added" +class ConversationTurnStartedEvent(FlowEvent): + """Event emitted when a conversational Flow starts a user turn.""" + + session_id: str + type: Literal["conversation_turn_started"] = "conversation_turn_started" + + class ConversationRouteSelectedEvent(FlowEvent): """Event emitted when a conversational Flow selects a route for a turn.""" diff --git a/lib/crewai/src/crewai/experimental/conversational_mixin.py b/lib/crewai/src/crewai/experimental/conversational_mixin.py index 4f39565c0a..c3349d6786 100644 --- a/lib/crewai/src/crewai/experimental/conversational_mixin.py +++ b/lib/crewai/src/crewai/experimental/conversational_mixin.py @@ -30,6 +30,7 @@ from crewai.events.types.flow_events import ( ConversationMessageAddedEvent, ConversationRouteSelectedEvent, + ConversationTurnStartedEvent, ) from crewai.experimental.conversational import ( AgentMessage, @@ -280,6 +281,14 @@ def handle_turn( """ state = cast(ConversationState, self.state) sid = session_id or state.id + crewai_event_bus.emit( + self, + ConversationTurnStartedEvent( + type="conversation_turn_started", + flow_name=self.name or self.__class__.__name__, + session_id=sid, + ), + ) # Stash the pending turn so the kickoff extension hook picks it up # after persist restore. diff --git a/lib/crewai/tests/test_flow_conversation.py b/lib/crewai/tests/test_flow_conversation.py index 3fea6b4714..08a3398061 100644 --- a/lib/crewai/tests/test_flow_conversation.py +++ b/lib/crewai/tests/test_flow_conversation.py @@ -14,6 +14,7 @@ from crewai.events.types.flow_events import ( ConversationMessageAddedEvent, ConversationRouteSelectedEvent, + ConversationTurnStartedEvent, FlowStartedEvent, MethodExecutionFinishedEvent, MethodExecutionStartedEvent, @@ -1125,6 +1126,66 @@ def capture_message( assert observed_events[0] == "flow_started" assert observed_events[1] == "conversation_message_added" + def test_handle_turn_emits_turn_started_for_each_conversational_turn( + self, + ) -> None: + """Each ``handle_turn()`` emits a usage-friendly turn start event.""" + + @ConversationConfig(defer_trace_finalization=True) + class DeferredFlow(ConversationalFlow): + def route_turn(self, context: dict[str, Any]) -> str | None: + return "work" + + @listen("work") + def do_work(self) -> str: + self.append_assistant_message("worked") + return "worked" + + flow = DeferredFlow() + default_session_id = flow.state.id + turn_events: list[ConversationTurnStartedEvent] = [] + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(ConversationTurnStartedEvent) + def capture(_: Any, event: ConversationTurnStartedEvent) -> None: + turn_events.append(event) + + flow.handle_turn("turn 1") + flow.handle_turn("turn 2", session_id="custom-session") + crewai_event_bus.flush() + + assert [event.type for event in turn_events] == [ + "conversation_turn_started", + "conversation_turn_started", + ] + assert turn_events[0].session_id == default_session_id + assert turn_events[1].session_id == "custom-session" + + def test_conversation_turn_started_tracks_feature_usage(self) -> None: + """Conversation turn events count conversational Flow usage.""" + from crewai.events.event_listener import event_listener + + with ( + crewai_event_bus.scoped_handlers(), + patch.object( + event_listener._telemetry, + "feature_usage_span", + ) as feature_usage_span, + ): + event_listener.setup_listeners(crewai_event_bus) + crewai_event_bus.emit( + self, + ConversationTurnStartedEvent( + type="conversation_turn_started", + flow_name="ChatFlow", + session_id="session-1", + ), + ) + crewai_event_bus.flush() + + feature_usage_span.assert_any_call("flow:conversation") + def test_route_event_uses_no_message_index_for_empty_transcript(self) -> None: """Route events do not reference index zero when no message exists.""" From f507e4dfb6d573f60f6876811bafd040ef309563 Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Wed, 24 Jun 2026 12:24:14 -0700 Subject: [PATCH 2/4] adjusted name to flow:conversation_turn --- lib/crewai/src/crewai/events/event_listener.py | 2 +- lib/crewai/tests/test_flow_conversation.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/crewai/src/crewai/events/event_listener.py b/lib/crewai/src/crewai/events/event_listener.py index 6af504a9f9..03cebf7935 100644 --- a/lib/crewai/src/crewai/events/event_listener.py +++ b/lib/crewai/src/crewai/events/event_listener.py @@ -322,7 +322,7 @@ def on_flow_finished(source: Any, event: FlowFinishedEvent) -> None: def on_conversation_turn_started( _: Any, event: ConversationTurnStartedEvent ) -> None: - self._telemetry.feature_usage_span("flow:conversation") + self._telemetry.feature_usage_span("flow:conversation_turn") @crewai_event_bus.on(MethodExecutionStartedEvent) def on_method_execution_started( diff --git a/lib/crewai/tests/test_flow_conversation.py b/lib/crewai/tests/test_flow_conversation.py index 08a3398061..4bbb15208a 100644 --- a/lib/crewai/tests/test_flow_conversation.py +++ b/lib/crewai/tests/test_flow_conversation.py @@ -1184,7 +1184,7 @@ def test_conversation_turn_started_tracks_feature_usage(self) -> None: ) crewai_event_bus.flush() - feature_usage_span.assert_any_call("flow:conversation") + feature_usage_span.assert_any_call("flow:conversation_turn") def test_route_event_uses_no_message_index_for_empty_transcript(self) -> None: """Route events do not reference index zero when no message exists.""" From 928a536a9fc1823e147cb9da50526ffda2a403bc Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Thu, 25 Jun 2026 10:02:26 -0700 Subject: [PATCH 3/4] only mark on turn completed event --- lib/crewai/src/crewai/events/__init__.py | 6 ++ .../src/crewai/events/event_listener.py | 8 +- lib/crewai/src/crewai/events/event_types.py | 4 + .../src/crewai/events/types/flow_events.py | 21 +++++ .../experimental/conversational_mixin.py | 45 +++++++--- lib/crewai/tests/test_flow_conversation.py | 83 ++++++++++++++++--- 6 files changed, 140 insertions(+), 27 deletions(-) diff --git a/lib/crewai/src/crewai/events/__init__.py b/lib/crewai/src/crewai/events/__init__.py index 0b7b944ad9..ceb6271e99 100644 --- a/lib/crewai/src/crewai/events/__init__.py +++ b/lib/crewai/src/crewai/events/__init__.py @@ -63,6 +63,8 @@ from crewai.events.types.flow_events import ( ConversationMessageAddedEvent, ConversationRouteSelectedEvent, + ConversationTurnCompletedEvent, + ConversationTurnFailedEvent, ConversationTurnStartedEvent, FlowCreatedEvent, FlowEvent, @@ -186,6 +188,8 @@ "CrewTrainStartedEvent": "crewai.events.types.crew_events", "ConversationMessageAddedEvent": "crewai.events.types.flow_events", "ConversationRouteSelectedEvent": "crewai.events.types.flow_events", + "ConversationTurnCompletedEvent": "crewai.events.types.flow_events", + "ConversationTurnFailedEvent": "crewai.events.types.flow_events", "ConversationTurnStartedEvent": "crewai.events.types.flow_events", "FlowCreatedEvent": "crewai.events.types.flow_events", "FlowEvent": "crewai.events.types.flow_events", @@ -307,6 +311,8 @@ def __getattr__(name: str) -> Any: "CircularDependencyError", "ConversationMessageAddedEvent", "ConversationRouteSelectedEvent", + "ConversationTurnCompletedEvent", + "ConversationTurnFailedEvent", "ConversationTurnStartedEvent", "CrewKickoffCompletedEvent", "CrewKickoffFailedEvent", diff --git a/lib/crewai/src/crewai/events/event_listener.py b/lib/crewai/src/crewai/events/event_listener.py index 03cebf7935..a1a771f44a 100644 --- a/lib/crewai/src/crewai/events/event_listener.py +++ b/lib/crewai/src/crewai/events/event_listener.py @@ -41,7 +41,7 @@ DefaultEnvEvent, ) from crewai.events.types.flow_events import ( - ConversationTurnStartedEvent, + ConversationTurnCompletedEvent, FlowCreatedEvent, FlowFinishedEvent, FlowPausedEvent, @@ -318,9 +318,9 @@ def on_flow_finished(source: Any, event: FlowFinishedEvent) -> None: source.flow_id, ) - @crewai_event_bus.on(ConversationTurnStartedEvent) - def on_conversation_turn_started( - _: Any, event: ConversationTurnStartedEvent + @crewai_event_bus.on(ConversationTurnCompletedEvent) + def on_conversation_turn_completed( + _: Any, event: ConversationTurnCompletedEvent ) -> None: self._telemetry.feature_usage_span("flow:conversation_turn") diff --git a/lib/crewai/src/crewai/events/event_types.py b/lib/crewai/src/crewai/events/event_types.py index 2b5d92d7e0..8c589849ac 100644 --- a/lib/crewai/src/crewai/events/event_types.py +++ b/lib/crewai/src/crewai/events/event_types.py @@ -55,6 +55,8 @@ from crewai.events.types.flow_events import ( ConversationMessageAddedEvent, ConversationRouteSelectedEvent, + ConversationTurnCompletedEvent, + ConversationTurnFailedEvent, ConversationTurnStartedEvent, FlowFinishedEvent, FlowStartedEvent, @@ -163,6 +165,8 @@ | TaskFailedEvent | ConversationMessageAddedEvent | ConversationRouteSelectedEvent + | ConversationTurnCompletedEvent + | ConversationTurnFailedEvent | ConversationTurnStartedEvent | FlowStartedEvent | FlowFinishedEvent diff --git a/lib/crewai/src/crewai/events/types/flow_events.py b/lib/crewai/src/crewai/events/types/flow_events.py index 0a151649ac..8e33b384c6 100644 --- a/lib/crewai/src/crewai/events/types/flow_events.py +++ b/lib/crewai/src/crewai/events/types/flow_events.py @@ -191,6 +191,27 @@ class ConversationTurnStartedEvent(FlowEvent): type: Literal["conversation_turn_started"] = "conversation_turn_started" +class ConversationTurnCompletedEvent(FlowEvent): + """Event emitted when a conversational Flow completes a user turn.""" + + session_id: str + type: Literal["conversation_turn_completed"] = "conversation_turn_completed" + + +class ConversationTurnFailedEvent(FlowEvent): + """Event emitted when a conversational Flow turn fails.""" + + session_id: str + error: Exception + type: Literal["conversation_turn_failed"] = "conversation_turn_failed" + + model_config = ConfigDict(arbitrary_types_allowed=True) + + @field_serializer("error") + def _serialize_error(self, error: Exception) -> str: + return str(error) + + class ConversationRouteSelectedEvent(FlowEvent): """Event emitted when a conversational Flow selects a route for a turn.""" diff --git a/lib/crewai/src/crewai/experimental/conversational_mixin.py b/lib/crewai/src/crewai/experimental/conversational_mixin.py index c3349d6786..d989b91b77 100644 --- a/lib/crewai/src/crewai/experimental/conversational_mixin.py +++ b/lib/crewai/src/crewai/experimental/conversational_mixin.py @@ -30,6 +30,8 @@ from crewai.events.types.flow_events import ( ConversationMessageAddedEvent, ConversationRouteSelectedEvent, + ConversationTurnCompletedEvent, + ConversationTurnFailedEvent, ConversationTurnStartedEvent, ) from crewai.experimental.conversational import ( @@ -296,25 +298,44 @@ def handle_turn( self._pending_intents = list(intents) if intents else None self._pending_intent_llm = intent_llm - # Each turn is a fresh execution; clear graph tracking so the second - # turn re-runs instead of being treated as a checkpoint restore. - if "from_checkpoint" not in kickoff_kwargs: - self._reset_turn_execution_state() - - assistant_count = self._assistant_message_count() try: + # Each turn is a fresh execution; clear graph tracking so the second + # turn re-runs instead of being treated as a checkpoint restore. + if "from_checkpoint" not in kickoff_kwargs: + self._reset_turn_execution_state() + + assistant_count = self._assistant_message_count() result = self.kickoff(inputs={"id": sid}, **kickoff_kwargs) + if ( + result is not None + and self._assistant_message_count() == assistant_count + and self._is_public_turn_result(result) + ): + self.append_assistant_message(self._stringify_result(result)) + except Exception as exc: + crewai_event_bus.emit( + self, + ConversationTurnFailedEvent( + type="conversation_turn_failed", + flow_name=self.name or self.__class__.__name__, + session_id=sid, + error=exc, + ), + ) + raise finally: self._pending_user_message = None self._pending_intents = None self._pending_intent_llm = None - if ( - result is not None - and self._assistant_message_count() == assistant_count - and self._is_public_turn_result(result) - ): - self.append_assistant_message(self._stringify_result(result)) + crewai_event_bus.emit( + self, + ConversationTurnCompletedEvent( + type="conversation_turn_completed", + flow_name=self.name or self.__class__.__name__, + session_id=sid, + ), + ) return result def chat( diff --git a/lib/crewai/tests/test_flow_conversation.py b/lib/crewai/tests/test_flow_conversation.py index 4bbb15208a..425adca828 100644 --- a/lib/crewai/tests/test_flow_conversation.py +++ b/lib/crewai/tests/test_flow_conversation.py @@ -14,6 +14,8 @@ from crewai.events.types.flow_events import ( ConversationMessageAddedEvent, ConversationRouteSelectedEvent, + ConversationTurnCompletedEvent, + ConversationTurnFailedEvent, ConversationTurnStartedEvent, FlowStartedEvent, MethodExecutionFinishedEvent, @@ -1126,10 +1128,10 @@ def capture_message( assert observed_events[0] == "flow_started" assert observed_events[1] == "conversation_message_added" - def test_handle_turn_emits_turn_started_for_each_conversational_turn( + def test_handle_turn_emits_started_and_completed_for_each_conversational_turn( self, ) -> None: - """Each ``handle_turn()`` emits a usage-friendly turn start event.""" + """Each ``handle_turn()`` emits paired turn lifecycle events.""" @ConversationConfig(defer_trace_finalization=True) class DeferredFlow(ConversationalFlow): @@ -1143,27 +1145,86 @@ def do_work(self) -> str: flow = DeferredFlow() default_session_id = flow.state.id - turn_events: list[ConversationTurnStartedEvent] = [] + turn_events: list[ + ConversationTurnStartedEvent | ConversationTurnCompletedEvent + ] = [] - with crewai_event_bus.scoped_handlers(): + original_emit = crewai_event_bus.emit - @crewai_event_bus.on(ConversationTurnStartedEvent) - def capture(_: Any, event: ConversationTurnStartedEvent) -> None: + def capture_emit(source: Any, event: Any) -> Any: + if isinstance( + event, (ConversationTurnStartedEvent, ConversationTurnCompletedEvent) + ): turn_events.append(event) + return original_emit(source, event) + with patch.object(crewai_event_bus, "emit", side_effect=capture_emit): flow.handle_turn("turn 1") flow.handle_turn("turn 2", session_id="custom-session") crewai_event_bus.flush() assert [event.type for event in turn_events] == [ "conversation_turn_started", + "conversation_turn_completed", "conversation_turn_started", + "conversation_turn_completed", ] assert turn_events[0].session_id == default_session_id - assert turn_events[1].session_id == "custom-session" + assert turn_events[1].session_id == default_session_id + assert turn_events[2].session_id == "custom-session" + assert turn_events[3].session_id == "custom-session" + + def test_handle_turn_emits_failed_instead_of_completed_when_turn_raises( + self, + ) -> None: + """Failed turns emit a terminal failure event without completion.""" + + @ConversationConfig(defer_trace_finalization=True) + class FailingFlow(ConversationalFlow): + def route_turn(self, context: dict[str, Any]) -> str | None: + return "work" + + @listen("work") + def do_work(self) -> str: + raise RuntimeError("turn exploded") + + flow = FailingFlow() + turn_events: list[ + ConversationTurnStartedEvent + | ConversationTurnCompletedEvent + | ConversationTurnFailedEvent + ] = [] + original_emit = crewai_event_bus.emit + + def capture_emit(source: Any, event: Any) -> Any: + if isinstance( + event, + ( + ConversationTurnStartedEvent, + ConversationTurnCompletedEvent, + ConversationTurnFailedEvent, + ), + ): + turn_events.append(event) + return original_emit(source, event) - def test_conversation_turn_started_tracks_feature_usage(self) -> None: - """Conversation turn events count conversational Flow usage.""" + with patch.object(crewai_event_bus, "emit", side_effect=capture_emit): + with pytest.raises(RuntimeError, match="turn exploded"): + flow.handle_turn("turn 1") + crewai_event_bus.flush() + + assert [event.type for event in turn_events] == [ + "conversation_turn_started", + "conversation_turn_failed", + ] + assert turn_events[0].session_id == flow.state.id + failed_event = turn_events[1] + assert isinstance(failed_event, ConversationTurnFailedEvent) + assert failed_event.session_id == flow.state.id + assert str(failed_event.error) == "turn exploded" + + def test_conversation_turn_completed_tracks_feature_usage(self) -> None: + """Completed conversation turns count conversational Flow usage.""" from crewai.events.event_listener import event_listener with ( @@ -1176,8 +1237,8 @@ def test_conversation_turn_started_tracks_feature_usage(self) -> None: event_listener.setup_listeners(crewai_event_bus) crewai_event_bus.emit( self, - ConversationTurnStartedEvent( - type="conversation_turn_started", + ConversationTurnCompletedEvent( + type="conversation_turn_completed", flow_name="ChatFlow", session_id="session-1", ), From 72a543cb35553608e9306418e7e51618f16cc30e Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Thu, 25 Jun 2026 10:27:25 -0700 Subject: [PATCH 4/4] ensure tui also emits these events --- lib/cli/src/crewai_cli/kickoff_flow.py | 4 ++ lib/cli/tests/test_kickoff_flow.py | 37 +++++++++++++++++++ .../experimental/conversational_mixin.py | 36 +++++++++++++----- lib/crewai/tests/test_flow_conversation.py | 37 +++++++++++++------ 4 files changed, 93 insertions(+), 21 deletions(-) diff --git a/lib/cli/src/crewai_cli/kickoff_flow.py b/lib/cli/src/crewai_cli/kickoff_flow.py index bde1ddee73..71b6079eff 100644 --- a/lib/cli/src/crewai_cli/kickoff_flow.py +++ b/lib/cli/src/crewai_cli/kickoff_flow.py @@ -65,8 +65,12 @@ def _load_conversational_flow_from_kickoff_script() -> Any | None: def _run_conversational_flow_tui(flow: Any) -> Any: + from crewai.events.event_listener import EventListener + from crewai_cli.crew_run_tui import CrewRunApp + EventListener() # ensures we get events from the TUI + app = CrewRunApp( crew_name=getattr(flow, "name", None) or type(flow).__name__, conversational=True, diff --git a/lib/cli/tests/test_kickoff_flow.py b/lib/cli/tests/test_kickoff_flow.py index 52eb299ee8..b22201404f 100644 --- a/lib/cli/tests/test_kickoff_flow.py +++ b/lib/cli/tests/test_kickoff_flow.py @@ -61,3 +61,40 @@ class Result: kickoff_flow.kickoff_flow() assert calls == [["uv", "run", "kickoff"]] + + +def test_run_conversational_flow_tui_initializes_event_listener(monkeypatch) -> None: + calls: list[str] = [] + + class FakeEventListener: + def __init__(self) -> None: + calls.append("listener") + + class FakeCrewRunApp: + def __init__(self, *, crew_name: str, conversational: bool) -> None: + calls.append("app") + self.crew_name = crew_name + self.conversational = conversational + self._status = "completed" + self._crew_result = "done" + self._flow = None + + def run(self) -> None: + calls.append("run") + + class DemoFlow: + name = "Demo" + + monkeypatch.setattr( + "crewai.events.event_listener.EventListener", + FakeEventListener, + ) + monkeypatch.setattr( + "crewai_cli.crew_run_tui.CrewRunApp", + FakeCrewRunApp, + ) + + result = kickoff_flow._run_conversational_flow_tui(DemoFlow()) + + assert result == "done" + assert calls == ["listener", "app", "run"] diff --git a/lib/crewai/src/crewai/experimental/conversational_mixin.py b/lib/crewai/src/crewai/experimental/conversational_mixin.py index d989b91b77..95826d9ee9 100644 --- a/lib/crewai/src/crewai/experimental/conversational_mixin.py +++ b/lib/crewai/src/crewai/experimental/conversational_mixin.py @@ -298,6 +298,7 @@ def handle_turn( self._pending_intents = list(intents) if intents else None self._pending_intent_llm = intent_llm + failed_event: ConversationTurnFailedEvent | None = None try: # Each turn is a fresh execution; clear graph tracking so the second # turn re-runs instead of being treated as a checkpoint restore. @@ -313,23 +314,21 @@ def handle_turn( ): self.append_assistant_message(self._stringify_result(result)) except Exception as exc: - crewai_event_bus.emit( - self, - ConversationTurnFailedEvent( - type="conversation_turn_failed", - flow_name=self.name or self.__class__.__name__, - session_id=sid, - error=exc, - ), + failed_event = ConversationTurnFailedEvent( + type="conversation_turn_failed", + flow_name=self.name or self.__class__.__name__, + session_id=sid, + error=exc, ) raise finally: self._pending_user_message = None self._pending_intents = None self._pending_intent_llm = None + if failed_event is not None: + self._emit_terminal_conversation_turn_event(failed_event) - crewai_event_bus.emit( - self, + self._emit_terminal_conversation_turn_event( ConversationTurnCompletedEvent( type="conversation_turn_completed", flow_name=self.name or self.__class__.__name__, @@ -338,6 +337,23 @@ def handle_turn( ) return result + def _emit_terminal_conversation_turn_event( + self, + event: ConversationTurnCompletedEvent | ConversationTurnFailedEvent, + ) -> None: + """Emit a terminal turn event and wait for its own handlers.""" + future = crewai_event_bus.emit(self, event) + if future is None: + return + try: + future.result(timeout=30) + except Exception: + logger.warning( + "%s handler failed or timed out", + event.__class__.__name__, + exc_info=True, + ) + def chat( self, *, diff --git a/lib/crewai/tests/test_flow_conversation.py b/lib/crewai/tests/test_flow_conversation.py index 425adca828..a5849eb180 100644 --- a/lib/crewai/tests/test_flow_conversation.py +++ b/lib/crewai/tests/test_flow_conversation.py @@ -1194,6 +1194,7 @@ def do_work(self) -> str: | ConversationTurnCompletedEvent | ConversationTurnFailedEvent ] = [] + handled_failed_events: list[ConversationTurnFailedEvent] = [] original_emit = crewai_event_bus.emit def capture_emit(source: Any, event: Any) -> Any: @@ -1208,10 +1209,19 @@ def capture_emit(source: Any, event: Any) -> Any: turn_events.append(event) return original_emit(source, event) - with patch.object(crewai_event_bus, "emit", side_effect=capture_emit): + with ( + crewai_event_bus.scoped_handlers(), + patch.object(crewai_event_bus, "emit", side_effect=capture_emit), + ): + + @crewai_event_bus.on(ConversationTurnFailedEvent) + def capture_failed( + _: Any, event: ConversationTurnFailedEvent + ) -> None: + handled_failed_events.append(event) + with pytest.raises(RuntimeError, match="turn exploded"): flow.handle_turn("turn 1") - crewai_event_bus.flush() assert [event.type for event in turn_events] == [ "conversation_turn_started", @@ -1222,11 +1232,24 @@ def capture_emit(source: Any, event: Any) -> Any: assert isinstance(failed_event, ConversationTurnFailedEvent) assert failed_event.session_id == flow.state.id assert str(failed_event.error) == "turn exploded" + assert handled_failed_events == [failed_event] def test_conversation_turn_completed_tracks_feature_usage(self) -> None: """Completed conversation turns count conversational Flow usage.""" from crewai.events.event_listener import event_listener + @ConversationConfig(defer_trace_finalization=True) + class DeferredFlow(ConversationalFlow): + def route_turn(self, context: dict[str, Any]) -> str | None: + return "work" + + @listen("work") + def do_work(self) -> str: + self.append_assistant_message("worked") + return "worked" + + flow = DeferredFlow() + with ( crewai_event_bus.scoped_handlers(), patch.object( @@ -1235,15 +1258,7 @@ def test_conversation_turn_completed_tracks_feature_usage(self) -> None: ) as feature_usage_span, ): event_listener.setup_listeners(crewai_event_bus) - crewai_event_bus.emit( - self, - ConversationTurnCompletedEvent( - type="conversation_turn_completed", - flow_name="ChatFlow", - session_id="session-1", - ), - ) - crewai_event_bus.flush() + flow.handle_turn("turn 1") feature_usage_span.assert_any_call("flow:conversation_turn")