Skip to content
4 changes: 4 additions & 0 deletions lib/cli/src/crewai_cli/kickoff_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,12 @@ def _load_conversational_flow_from_kickoff_script() -> Any | None:


def _run_conversational_flow_tui(flow: Any) -> Any:
from crewai.events.event_listener import EventListener

from crewai_cli.crew_run_tui import CrewRunApp

EventListener() # ensures we get events from the TUI

app = CrewRunApp(
crew_name=getattr(flow, "name", None) or type(flow).__name__,
conversational=True,
Expand Down
37 changes: 37 additions & 0 deletions lib/cli/tests/test_kickoff_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,40 @@ class Result:
kickoff_flow.kickoff_flow()

assert calls == [["uv", "run", "kickoff"]]


def test_run_conversational_flow_tui_initializes_event_listener(monkeypatch) -> None:
calls: list[str] = []

class FakeEventListener:
def __init__(self) -> None:
calls.append("listener")

class FakeCrewRunApp:
def __init__(self, *, crew_name: str, conversational: bool) -> None:
calls.append("app")
self.crew_name = crew_name
self.conversational = conversational
self._status = "completed"
self._crew_result = "done"
self._flow = None

def run(self) -> None:
calls.append("run")

class DemoFlow:
name = "Demo"

monkeypatch.setattr(
"crewai.events.event_listener.EventListener",
FakeEventListener,
)
monkeypatch.setattr(
"crewai_cli.crew_run_tui.CrewRunApp",
FakeCrewRunApp,
)

result = kickoff_flow._run_conversational_flow_tui(DemoFlow())

assert result == "done"
assert calls == ["listener", "app", "run"]
9 changes: 9 additions & 0 deletions lib/crewai/src/crewai/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@
from crewai.events.types.flow_events import (
ConversationMessageAddedEvent,
ConversationRouteSelectedEvent,
ConversationTurnCompletedEvent,
ConversationTurnFailedEvent,
ConversationTurnStartedEvent,
FlowCreatedEvent,
FlowEvent,
FlowFinishedEvent,
Expand Down Expand Up @@ -185,6 +188,9 @@
"CrewTrainStartedEvent": "crewai.events.types.crew_events",
"ConversationMessageAddedEvent": "crewai.events.types.flow_events",
"ConversationRouteSelectedEvent": "crewai.events.types.flow_events",
"ConversationTurnCompletedEvent": "crewai.events.types.flow_events",
"ConversationTurnFailedEvent": "crewai.events.types.flow_events",
"ConversationTurnStartedEvent": "crewai.events.types.flow_events",
"FlowCreatedEvent": "crewai.events.types.flow_events",
"FlowEvent": "crewai.events.types.flow_events",
"FlowFinishedEvent": "crewai.events.types.flow_events",
Expand Down Expand Up @@ -305,6 +311,9 @@ def __getattr__(name: str) -> Any:
"CircularDependencyError",
"ConversationMessageAddedEvent",
"ConversationRouteSelectedEvent",
"ConversationTurnCompletedEvent",
"ConversationTurnFailedEvent",
"ConversationTurnStartedEvent",
"CrewKickoffCompletedEvent",
"CrewKickoffFailedEvent",
"CrewKickoffStartedEvent",
Expand Down
7 changes: 7 additions & 0 deletions lib/crewai/src/crewai/events/event_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
DefaultEnvEvent,
)
from crewai.events.types.flow_events import (
ConversationTurnCompletedEvent,
FlowCreatedEvent,
FlowFinishedEvent,
FlowPausedEvent,
Expand Down Expand Up @@ -317,6 +318,12 @@ def on_flow_finished(source: Any, event: FlowFinishedEvent) -> None:
source.flow_id,
)

@crewai_event_bus.on(ConversationTurnCompletedEvent)
def on_conversation_turn_completed(
_: Any, event: ConversationTurnCompletedEvent
) -> None:
self._telemetry.feature_usage_span("flow:conversation_turn")

@crewai_event_bus.on(MethodExecutionStartedEvent)
def on_method_execution_started(
source: Any, event: MethodExecutionStartedEvent
Expand Down
6 changes: 6 additions & 0 deletions lib/crewai/src/crewai/events/event_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@
from crewai.events.types.flow_events import (
ConversationMessageAddedEvent,
ConversationRouteSelectedEvent,
ConversationTurnCompletedEvent,
ConversationTurnFailedEvent,
ConversationTurnStartedEvent,
FlowFinishedEvent,
FlowStartedEvent,
MethodExecutionFailedEvent,
Expand Down Expand Up @@ -162,6 +165,9 @@
| TaskFailedEvent
| ConversationMessageAddedEvent
| ConversationRouteSelectedEvent
| ConversationTurnCompletedEvent
| ConversationTurnFailedEvent
| ConversationTurnStartedEvent
| FlowStartedEvent
| FlowFinishedEvent
| MethodExecutionStartedEvent
Expand Down
28 changes: 28 additions & 0 deletions lib/crewai/src/crewai/events/types/flow_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,34 @@ class ConversationMessageAddedEvent(FlowEvent):
type: Literal["conversation_message_added"] = "conversation_message_added"


class ConversationTurnStartedEvent(FlowEvent):
"""Event emitted when a conversational Flow starts a user turn."""

session_id: str
type: Literal["conversation_turn_started"] = "conversation_turn_started"


class ConversationTurnCompletedEvent(FlowEvent):
"""Event emitted when a conversational Flow completes a user turn."""

session_id: str
type: Literal["conversation_turn_completed"] = "conversation_turn_completed"


class ConversationTurnFailedEvent(FlowEvent):
"""Event emitted when a conversational Flow turn fails."""

session_id: str
error: Exception
type: Literal["conversation_turn_failed"] = "conversation_turn_failed"

model_config = ConfigDict(arbitrary_types_allowed=True)

@field_serializer("error")
def _serialize_error(self, error: Exception) -> str:
return str(error)


class ConversationRouteSelectedEvent(FlowEvent):
"""Event emitted when a conversational Flow selects a route for a turn."""

Expand Down
70 changes: 58 additions & 12 deletions lib/crewai/src/crewai/experimental/conversational_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
from crewai.events.types.flow_events import (
ConversationMessageAddedEvent,
ConversationRouteSelectedEvent,
ConversationTurnCompletedEvent,
ConversationTurnFailedEvent,
ConversationTurnStartedEvent,
)
from crewai.experimental.conversational import (
AgentMessage,
Expand Down Expand Up @@ -280,34 +283,77 @@ def handle_turn(
"""
state = cast(ConversationState, self.state)
sid = session_id or state.id
crewai_event_bus.emit(
self,
ConversationTurnStartedEvent(
type="conversation_turn_started",
flow_name=self.name or self.__class__.__name__,
session_id=sid,
),
)

# Stash the pending turn so the kickoff extension hook picks it up
# after persist restore.
self._pending_user_message = message
self._pending_intents = list(intents) if intents else None
self._pending_intent_llm = intent_llm

# Each turn is a fresh execution; clear graph tracking so the second
# turn re-runs instead of being treated as a checkpoint restore.
if "from_checkpoint" not in kickoff_kwargs:
self._reset_turn_execution_state()

assistant_count = self._assistant_message_count()
failed_event: ConversationTurnFailedEvent | None = None
try:
# Each turn is a fresh execution; clear graph tracking so the second
# turn re-runs instead of being treated as a checkpoint restore.
if "from_checkpoint" not in kickoff_kwargs:
self._reset_turn_execution_state()

assistant_count = self._assistant_message_count()
result = self.kickoff(inputs={"id": sid}, **kickoff_kwargs)
if (
result is not None
and self._assistant_message_count() == assistant_count
and self._is_public_turn_result(result)
):
self.append_assistant_message(self._stringify_result(result))
except Exception as exc:
failed_event = ConversationTurnFailedEvent(
type="conversation_turn_failed",
flow_name=self.name or self.__class__.__name__,
session_id=sid,
error=exc,
)
raise
finally:
self._pending_user_message = None
self._pending_intents = None
self._pending_intent_llm = None
if failed_event is not None:
self._emit_terminal_conversation_turn_event(failed_event)

if (
result is not None
and self._assistant_message_count() == assistant_count
and self._is_public_turn_result(result)
):
self.append_assistant_message(self._stringify_result(result))
self._emit_terminal_conversation_turn_event(
ConversationTurnCompletedEvent(
type="conversation_turn_completed",
flow_name=self.name or self.__class__.__name__,
session_id=sid,
),
)
Comment thread
cursor[bot] marked this conversation as resolved.
return result

def _emit_terminal_conversation_turn_event(
self,
event: ConversationTurnCompletedEvent | ConversationTurnFailedEvent,
) -> None:
"""Emit a terminal turn event and wait for its own handlers."""
future = crewai_event_bus.emit(self, event)
if future is None:
return
try:
future.result(timeout=30)
except Exception:
Comment thread
lorenzejay marked this conversation as resolved.
logger.warning(
"%s handler failed or timed out",
event.__class__.__name__,
exc_info=True,
)

def chat(
self,
*,
Expand Down
Loading
Loading